diff --git a/Sources/Auth/AuthClient.swift b/Sources/Auth/AuthClient.swift index 3ac166a1..37a1abea 100644 --- a/Sources/Auth/AuthClient.swift +++ b/Sources/Auth/AuthClient.swift @@ -106,7 +106,9 @@ public final class AuthClient: Sendable { /// - Parameters: /// - email: User's email address. /// - password: Password for the user. - /// - data: User's metadata. + /// - data: Custom data object to store additional user metadata. + /// - redirectTo: The redirect URL embedded in the email link, defaults to ``Configuration/redirectToURL`` if not provided. + /// - captchaToken: Optional captcha token for securing this endpoint. @discardableResult public func signUp( email: String, @@ -145,7 +147,8 @@ public final class AuthClient: Sendable { /// - Parameters: /// - phone: User's phone number with international prefix. /// - password: Password for the user. - /// - data: User's metadata. + /// - data: Custom data object to store additional user metadata. + /// - captchaToken: Optional captcha token for securing this endpoint. @discardableResult public func signUp( phone: String, @@ -184,6 +187,10 @@ public final class AuthClient: Sendable { } /// Log in an existing user with an email and password. + /// - Parameters: + /// - email: User's email address. + /// - password: User's password. + /// - captchaToken: Optional captcha token for securing this endpoint. @discardableResult public func signIn( email: String, @@ -207,6 +214,10 @@ public final class AuthClient: Sendable { } /// Log in an existing user with a phone and password. + /// - Parameters: + /// - email: User's phone number. + /// - password: User's password. + /// - captchaToken: Optional captcha token for securing this endpoint. @discardableResult public func signIn( phone: String, @@ -333,8 +344,7 @@ public final class AuthClient: Sendable { /// - data: User's metadata. /// - captchaToken: Captcha verification token. /// - /// - Note: You need to configure a WhatsApp sender on Twillo if you are using phone sign in with - /// the `whatsapp` channel. + /// - Note: You need to configure a WhatsApp sender on Twillo if you are using phone sign in with the `whatsapp` channel. public func signInWithOTP( phone: String, channel: MessagingChannel = .sms, @@ -362,8 +372,7 @@ public final class AuthClient: Sendable { /// Attempts a single-sign on using an enterprise Identity Provider. /// - Parameters: /// - domain: The email domain to use for signing in. - /// - redirectTo: The URL to redirect the user to after they sign in with the third-party - /// provider. + /// - redirectTo: The URL to redirect the user to after they sign in with the third-party provider. /// - captchaToken: The captcha token to be used for captcha verification. /// - Returns: A URL that you can use to initiate the provider's authentication flow. public func signInWithSSO( diff --git a/Sources/Realtime/V2/PushV2.swift b/Sources/Realtime/V2/PushV2.swift index 70a62edd..e5bc9307 100644 --- a/Sources/Realtime/V2/PushV2.swift +++ b/Sources/Realtime/V2/PushV2.swift @@ -20,11 +20,11 @@ actor PushV2 { } func send() async -> PushStatus { - await channel?.socket?.push(message) + await channel?.socket.push(message) if channel?.config.broadcast.acknowledgeBroadcasts == true { do { - return try await withTimeout(interval: channel?.socket?.options.timeoutInterval ?? 10) { + return try await withTimeout(interval: channel?.socket.options().timeoutInterval ?? 10) { await withCheckedContinuation { self.receivedContinuation = $0 } diff --git a/Sources/Realtime/V2/RealtimeChannelV2.swift b/Sources/Realtime/V2/RealtimeChannelV2.swift index 0df24214..03599f32 100644 --- a/Sources/Realtime/V2/RealtimeChannelV2.swift +++ b/Sources/Realtime/V2/RealtimeChannelV2.swift @@ -14,7 +14,34 @@ public struct RealtimeChannelConfig: Sendable { public var presence: PresenceJoinConfig } -public actor RealtimeChannelV2 { +struct Socket: Sendable { + var status: @Sendable () -> RealtimeClientV2.Status + var options: @Sendable () -> RealtimeClientOptions + var accessToken: @Sendable () -> String? + var makeRef: @Sendable () -> Int + + var connect: @Sendable () async -> Void + var addChannel: @Sendable (_ channel: RealtimeChannelV2) -> Void + var removeChannel: @Sendable (_ channel: RealtimeChannelV2) async -> Void + var push: @Sendable (_ message: RealtimeMessageV2) async -> Void +} + +extension Socket { + init(client: RealtimeClientV2) { + self.init( + status: { [weak client] in client?.status ?? .disconnected }, + options: { [weak client] in client?.options ?? .init() }, + accessToken: { [weak client] in client?.mutableState.accessToken }, + makeRef: { [weak client] in client?.makeRef() ?? 0 }, + connect: { [weak client] in await client?.connect() }, + addChannel: { [weak client] in client?.addChannel($0) }, + removeChannel: { [weak client] in await client?.removeChannel($0) }, + push: { [weak client] in await client?.push($0) } + ) + } +} + +public final class RealtimeChannelV2: Sendable { public typealias Subscription = ObservationToken public enum Status: Sendable { @@ -24,24 +51,22 @@ public actor RealtimeChannelV2 { case unsubscribing } - weak var socket: RealtimeClientV2? { - didSet { - assert(oldValue == nil, "socket should not be modified once set") - } + struct MutableState { + var clientChanges: [PostgresJoinConfig] = [] + var joinRef: String? + var pushes: [String: PushV2] = [:] } + private let mutableState = LockIsolated(MutableState()) + let topic: String let config: RealtimeChannelConfig let logger: (any SupabaseLogger)? + let socket: Socket private let callbackManager = CallbackManager() - private let statusEventEmitter = EventEmitter(initialEvent: .unsubscribed) - private var clientChanges: [PostgresJoinConfig] = [] - private var joinRef: String? - private var pushes: [String: PushV2] = [:] - public private(set) var status: Status { get { statusEventEmitter.lastEvent } set { statusEventEmitter.emit(newValue) } @@ -54,13 +79,13 @@ public actor RealtimeChannelV2 { init( topic: String, config: RealtimeChannelConfig, - socket: RealtimeClientV2, + socket: Socket, logger: (any SupabaseLogger)? ) { - self.socket = socket self.topic = topic self.config = config self.logger = logger + self.socket = socket } deinit { @@ -69,16 +94,16 @@ public actor RealtimeChannelV2 { /// Subscribes to the channel public func subscribe() async { - if await socket?.status != .connected { - if socket?.options.connectOnSubscribe != true { + if socket.status() != .connected { + if socket.options().connectOnSubscribe != true { fatalError( "You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?" ) } - await socket?.connect() + await socket.connect() } - await socket?.addChannel(self) + socket.addChannel(self) status = .subscribing logger?.debug("subscribing to channel \(topic)") @@ -86,15 +111,16 @@ public actor RealtimeChannelV2 { let joinConfig = RealtimeJoinConfig( broadcast: config.broadcast, presence: config.presence, - postgresChanges: clientChanges + postgresChanges: mutableState.clientChanges ) - let payload = await RealtimeJoinPayload( + let payload = RealtimeJoinPayload( config: joinConfig, - accessToken: socket?.accessToken + accessToken: socket.accessToken() ) - joinRef = await socket?.makeRef().description + let joinRef = socket.makeRef().description + mutableState.withValue { $0.joinRef = joinRef } logger?.debug("subscribing to channel with body: \(joinConfig)") @@ -109,7 +135,7 @@ public actor RealtimeChannelV2 { ) do { - try await withTimeout(interval: socket?.options.timeoutInterval ?? 10) { [self] in + try await withTimeout(interval: socket.options().timeoutInterval) { [self] in _ = await statusChange.first { @Sendable in $0 == .subscribed } } } catch { @@ -128,8 +154,8 @@ public actor RealtimeChannelV2 { await push( RealtimeMessageV2( - joinRef: joinRef, - ref: socket?.makeRef().description, + joinRef: mutableState.joinRef, + ref: socket.makeRef().description, topic: topic, event: ChannelEvent.leave, payload: [:] @@ -141,8 +167,8 @@ public actor RealtimeChannelV2 { logger?.debug("Updating auth token for channel \(topic)") await push( RealtimeMessageV2( - joinRef: joinRef, - ref: socket?.makeRef().description, + joinRef: mutableState.joinRef, + ref: socket.makeRef().description, topic: topic, event: ChannelEvent.accessToken, payload: ["access_token": .string(jwt)] @@ -162,8 +188,8 @@ public actor RealtimeChannelV2 { await push( RealtimeMessageV2( - joinRef: joinRef, - ref: socket?.makeRef().description, + joinRef: mutableState.joinRef, + ref: socket.makeRef().description, topic: topic, event: ChannelEvent.broadcast, payload: [ @@ -187,8 +213,8 @@ public actor RealtimeChannelV2 { await push( RealtimeMessageV2( - joinRef: joinRef, - ref: socket?.makeRef().description, + joinRef: mutableState.joinRef, + ref: socket.makeRef().description, topic: topic, event: ChannelEvent.presence, payload: [ @@ -203,8 +229,8 @@ public actor RealtimeChannelV2 { public func untrack() async { await push( RealtimeMessageV2( - joinRef: joinRef, - ref: socket?.makeRef().description, + joinRef: mutableState.joinRef, + ref: socket.makeRef().description, topic: topic, event: ChannelEvent.presence, payload: [ @@ -329,7 +355,7 @@ public actor RealtimeChannelV2 { Task { [weak self] in guard let self else { return } - await socket?.removeChannel(self) + await socket.removeChannel(self) logger?.debug("Unsubscribed from channel \(message.topic)") } @@ -439,7 +465,9 @@ public actor RealtimeChannelV2 { filter: filter ) - clientChanges.append(config) + mutableState.withValue { + $0.clientChanges.append(config) + } let id = callbackManager.addPostgresCallback(filter: config, callback: callback) return Subscription { [weak callbackManager, logger] in @@ -464,14 +492,18 @@ public actor RealtimeChannelV2 { private func push(_ message: RealtimeMessageV2) async -> PushStatus { let push = PushV2(channel: self, message: message) if let ref = message.ref { - pushes[ref] = push + mutableState.withValue { + $0.pushes[ref] = push + } } return await push.send() } private func didReceiveReply(ref: String, status: String) { Task { - let push = pushes.removeValue(forKey: ref) + let push = mutableState.withValue { + $0.pushes.removeValue(forKey: ref) + } await push?.didReceive(status: PushStatus(rawValue: status) ?? .ok) } } diff --git a/Sources/Realtime/V2/RealtimeClientV2.swift b/Sources/Realtime/V2/RealtimeClientV2.swift index 08e219d9..3dd247f4 100644 --- a/Sources/Realtime/V2/RealtimeClientV2.swift +++ b/Sources/Realtime/V2/RealtimeClientV2.swift @@ -15,7 +15,7 @@ import Helpers public typealias JSONObject = Helpers.JSONObject -public actor RealtimeClientV2 { +public final class RealtimeClientV2: Sendable { @available(*, deprecated, renamed: "RealtimeClientOptions") public struct Configuration: Sendable { var url: URL @@ -65,20 +65,25 @@ public actor RealtimeClientV2 { } } + struct MutableState { + var accessToken: String? + var ref = 0 + var pendingHeartbeatRef: Int? + var heartbeatTask: Task? + var messageTask: Task? + var connectionTask: Task? + var subscriptions: [String: RealtimeChannelV2] = [:] + } + let url: URL let options: RealtimeClientOptions let ws: any WebSocketClient - - var accessToken: String? + let mutableState = LockIsolated(MutableState()) let apikey: String? - var ref = 0 - var pendingHeartbeatRef: Int? - - var heartbeatTask: Task? - var messageTask: Task? - var connectionTask: Task? - public private(set) var subscriptions: [String: RealtimeChannelV2] = [:] + public var subscriptions: [String: RealtimeChannelV2] { + mutableState.subscriptions + } private let statusEventEmitter = EventEmitter(initialEvent: .disconnected) @@ -107,7 +112,7 @@ public actor RealtimeClientV2 { } @available(*, deprecated, renamed: "RealtimeClientV2.init(url:options:)") - public init(config: Configuration) { + public convenience init(config: Configuration) { self.init( url: config.url, options: RealtimeClientOptions( @@ -122,7 +127,7 @@ public actor RealtimeClientV2 { ) } - public init(url: URL, options: RealtimeClientOptions) { + public convenience init(url: URL, options: RealtimeClientOptions) { self.init( url: url, options: options, @@ -140,14 +145,19 @@ public actor RealtimeClientV2 { self.url = url self.options = options self.ws = ws - accessToken = options.accessToken ?? options.apikey apikey = options.apikey + + mutableState.withValue { + $0.accessToken = options.accessToken ?? options.apikey + } } deinit { - heartbeatTask?.cancel() - messageTask?.cancel() - subscriptions = [:] + mutableState.withValue { + $0.heartbeatTask?.cancel() + $0.messageTask?.cancel() + $0.subscriptions = [:] + } } /// Connects the socket. @@ -159,7 +169,7 @@ public actor RealtimeClientV2 { func connect(reconnect: Bool) async { if status == .disconnected { - connectionTask = Task { + let connectionTask = Task { if reconnect { try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.reconnectDelay)) @@ -193,6 +203,10 @@ public actor RealtimeClientV2 { } } } + + mutableState.withValue { + $0.connectionTask = connectionTask + } } _ = await statusChange.first { @Sendable in $0 == .connected } @@ -242,21 +256,25 @@ public actor RealtimeClientV2 { return RealtimeChannelV2( topic: "realtime:\(topic)", config: config, - socket: self, + socket: Socket(client: self), logger: self.options.logger ) } public func addChannel(_ channel: RealtimeChannelV2) { - subscriptions[channel.topic] = channel + mutableState.withValue { + $0.subscriptions[channel.topic] = channel + } } public func removeChannel(_ channel: RealtimeChannelV2) async { - if await channel.status == .subscribed { + if channel.status == .subscribed { await channel.unsubscribe() } - subscriptions[channel.topic] = nil + mutableState.withValue { + $0.subscriptions[channel.topic] = nil + } if subscriptions.isEmpty { options.logger?.debug("No more subscribed channel in socket") @@ -277,7 +295,7 @@ public actor RealtimeClientV2 { } private func listenForMessages() { - messageTask = Task { [weak self] in + let messageTask = Task { [weak self] in guard let self else { return } do { @@ -295,10 +313,13 @@ public actor RealtimeClientV2 { await reconnect() } } + mutableState.withValue { + $0.messageTask = messageTask + } } private func startHeartbeating() { - heartbeatTask = Task { [weak self, options] in + let heartbeatTask = Task { [weak self, options] in while !Task.isCancelled { try? await Task.sleep(nanoseconds: NSEC_PER_SEC * UInt64(options.heartbeatInterval)) if Task.isCancelled { @@ -307,36 +328,47 @@ public actor RealtimeClientV2 { await self?.sendHeartbeat() } } + mutableState.withValue { + $0.heartbeatTask = heartbeatTask + } } private func sendHeartbeat() async { - if pendingHeartbeatRef != nil { - pendingHeartbeatRef = nil - options.logger?.debug("Heartbeat timeout") + let pendingHeartbeatRef: Int? = mutableState.withValue { + if $0.pendingHeartbeatRef != nil { + $0.pendingHeartbeatRef = nil + return nil + } - await reconnect() - return + let ref = makeRef() + $0.pendingHeartbeatRef = ref + return ref } - pendingHeartbeatRef = makeRef() - - await push( - RealtimeMessageV2( - joinRef: nil, - ref: pendingHeartbeatRef?.description, - topic: "phoenix", - event: "heartbeat", - payload: [:] + if let pendingHeartbeatRef { + await push( + RealtimeMessageV2( + joinRef: nil, + ref: pendingHeartbeatRef.description, + topic: "phoenix", + event: "heartbeat", + payload: [:] + ) ) - ) + } else { + options.logger?.debug("Heartbeat timeout") + await reconnect() + } } public func disconnect() { options.logger?.debug("Closing WebSocket connection") - ref = 0 - messageTask?.cancel() - heartbeatTask?.cancel() - connectionTask?.cancel() + mutableState.withValue { + $0.ref = 0 + $0.messageTask?.cancel() + $0.heartbeatTask?.cancel() + $0.connectionTask?.cancel() + } ws.disconnect() status = .disconnected } @@ -344,25 +376,29 @@ public actor RealtimeClientV2 { /// Sets the JWT access token used for channel subscription authorization and Realtime RLS. /// - Parameter token: A JWT string. public func setAuth(_ token: String?) async { - accessToken = token + mutableState.withValue { + $0.accessToken = token + } for channel in subscriptions.values { - if let token, await channel.status == .subscribed { + if let token, channel.status == .subscribed { await channel.updateAuth(jwt: token) } } } private func onMessage(_ message: RealtimeMessageV2) async { - let channel = subscriptions[message.topic] - - if let ref = message.ref, Int(ref) == pendingHeartbeatRef { - pendingHeartbeatRef = nil - options.logger?.debug("heartbeat received") - } else { - options.logger? - .debug("Received event \(message.event) for channel \(channel?.topic ?? "null")") - await channel?.onMessage(message) + mutableState.withValue { + let channel = $0.subscriptions[message.topic] + + if let ref = message.ref, Int(ref) == $0.pendingHeartbeatRef { + $0.pendingHeartbeatRef = nil + options.logger?.debug("heartbeat received") + } else { + options.logger? + .debug("Received event \(message.event) for channel \(channel?.topic ?? "null")") + channel?.onMessage(message) + } } } @@ -388,8 +424,10 @@ public actor RealtimeClientV2 { } func makeRef() -> Int { - ref += 1 - return ref + mutableState.withValue { + $0.ref += 1 + return $0.ref + } } static func realtimeBaseURL(url: URL) -> URL { diff --git a/Tests/IntegrationTests/RealtimeIntegrationTests.swift b/Tests/IntegrationTests/RealtimeIntegrationTests.swift index 6c18686b..e768b91c 100644 --- a/Tests/IntegrationTests/RealtimeIntegrationTests.swift +++ b/Tests/IntegrationTests/RealtimeIntegrationTests.swift @@ -40,14 +40,14 @@ final class RealtimeIntegrationTests: XCTestCase { let expectation = expectation(description: "receivedBroadcastMessages") expectation.expectedFulfillmentCount = 3 - let channel = await realtime.channel("integration") { + let channel = realtime.channel("integration") { $0.broadcast.receiveOwnBroadcasts = true } let receivedMessages = LockIsolated<[JSONObject]>([]) Task { - for await message in await channel.broadcastStream(event: "test") { + for await message in channel.broadcastStream(event: "test") { receivedMessages.withValue { $0.append(message) } @@ -101,7 +101,7 @@ final class RealtimeIntegrationTests: XCTestCase { } func testPresence() async throws { - let channel = await realtime.channel("integration") { + let channel = realtime.channel("integration") { $0.broadcast.receiveOwnBroadcasts = true } @@ -111,7 +111,7 @@ final class RealtimeIntegrationTests: XCTestCase { let receivedPresenceChanges = LockIsolated<[any PresenceAction]>([]) Task { - for await presence in await channel.presenceChange() { + for await presence in channel.presenceChange() { receivedPresenceChanges.withValue { $0.append(presence) } @@ -161,7 +161,7 @@ final class RealtimeIntegrationTests: XCTestCase { // FIXME: Test getting stuck // func testPostgresChanges() async throws { -// let channel = await realtime.channel("db-changes") +// let channel = realtime.channel("db-changes") // // let receivedInsertActions = Task { // await channel.postgresChange(InsertAction.self, schema: "public").prefix(1).collect() diff --git a/Tests/RealtimeTests/RealtimeTests.swift b/Tests/RealtimeTests/RealtimeTests.swift index 4f10051a..11662c74 100644 --- a/Tests/RealtimeTests/RealtimeTests.swift +++ b/Tests/RealtimeTests/RealtimeTests.swift @@ -35,30 +35,30 @@ final class RealtimeTests: XCTestCase { ) } - override func tearDown() async throws { - await sut.disconnect() - - try await super.tearDown() + override func tearDown() { + sut.disconnect() + + super.tearDown() } func testBehavior() async throws { try await withTimeout(interval: 2) { [self] in - let channel = await sut.channel("public:messages") - _ = await channel.postgresChange(InsertAction.self, table: "messages") - _ = await channel.postgresChange(UpdateAction.self, table: "messages") - _ = await channel.postgresChange(DeleteAction.self, table: "messages") + let channel = sut.channel("public:messages") + _ = channel.postgresChange(InsertAction.self, table: "messages") + _ = channel.postgresChange(UpdateAction.self, table: "messages") + _ = channel.postgresChange(DeleteAction.self, table: "messages") - let statusChange = await sut.statusChange + let statusChange = sut.statusChange await connectSocketAndWait() let status = await statusChange.prefix(3).collect() XCTAssertEqual(status, [.disconnected, .connecting, .connected]) - let messageTask = await sut.messageTask + let messageTask = sut.mutableState.messageTask XCTAssertNotNil(messageTask) - let heartbeatTask = await sut.heartbeatTask + let heartbeatTask = sut.mutableState.heartbeatTask XCTAssertNotNil(heartbeatTask) let subscription = Task { @@ -75,7 +75,7 @@ final class RealtimeTests: XCTestCase { } func testSubscribeTimeout() async throws { - let channel = await sut.channel("public:messages") + let channel = sut.channel("public:messages") let joinEventCount = LockIsolated(0) ws.on { message in @@ -130,8 +130,8 @@ final class RealtimeTests: XCTestCase { ) ), RealtimeMessageV2( - joinRef: "3", - ref: "3", + joinRef: "2", + ref: "2", topic: "realtime:public:messages", event: "phx_join", payload: JSONObject( @@ -193,7 +193,7 @@ final class RealtimeTests: XCTestCase { let statuses = LockIsolated<[RealtimeClientV2.Status]>([]) Task { - for await status in await sut.statusChange { + for await status in sut.statusChange { statuses.withValue { $0.append(status) } @@ -204,7 +204,7 @@ final class RealtimeTests: XCTestCase { await fulfillment(of: [sentHeartbeatExpectation], timeout: 2) - let pendingHeartbeatRef = await sut.pendingHeartbeatRef + let pendingHeartbeatRef = sut.mutableState.pendingHeartbeatRef XCTAssertNotNil(pendingHeartbeatRef) // Wait until next heartbeat diff --git a/Tests/RealtimeTests/_PushTests.swift b/Tests/RealtimeTests/_PushTests.swift index 26b988be..4cefaca7 100644 --- a/Tests/RealtimeTests/_PushTests.swift +++ b/Tests/RealtimeTests/_PushTests.swift @@ -40,7 +40,7 @@ final class _PushTests: XCTestCase { broadcast: .init(acknowledgeBroadcasts: false), presence: .init() ), - socket: socket, + socket: Socket(client: socket), logger: nil ) let push = PushV2( diff --git a/Tests/SupabaseTests/SupabaseClientTests.swift b/Tests/SupabaseTests/SupabaseClientTests.swift index 7f240fbe..f8df2451 100644 --- a/Tests/SupabaseTests/SupabaseClientTests.swift +++ b/Tests/SupabaseTests/SupabaseClientTests.swift @@ -72,10 +72,10 @@ final class SupabaseClientTests: XCTestCase { XCTAssertEqual(client.functions.region, "ap-northeast-1") - let realtimeURL = await client.realtimeV2.url + let realtimeURL = client.realtimeV2.url XCTAssertEqual(realtimeURL.absoluteString, "https://project-ref.supabase.co/realtime/v1") - let realtimeOptions = await client.realtimeV2.options + let realtimeOptions = client.realtimeV2.options let expectedRealtimeHeader = client.defaultHeaders.merged(with: ["custom_realtime_header_key": "custom_realtime_header_value"]) XCTAssertNoDifference(realtimeOptions.headers, expectedRealtimeHeader) XCTAssertIdentical(realtimeOptions.logger as? Logger, logger)