Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/reconnect-errors
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
patch type="fixed" "Reconnect sequence stuck in failed state"
8 changes: 4 additions & 4 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ jobs:
# https://github.com/actions/runner-images/blob/main/images/macos/macos-26-arm64-Readme.md
- os: macos-26
xcode: latest
platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.1"
platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.2"
symbol-graph: true
- os: macos-26
xcode: latest
platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.1"
platform: "iOS Simulator,name=iPhone 17 Pro,OS=26.2"
extension-api-only: true
- os: macos-26
xcode: latest
Expand All @@ -84,10 +84,10 @@ jobs:
platform: "macOS,variant=Mac Catalyst"
- os: macos-26
xcode: latest
platform: "visionOS Simulator,name=Apple Vision Pro,OS=26.1"
platform: "visionOS Simulator,name=Apple Vision Pro,OS=26.2"
- os: macos-26
xcode: latest
platform: "tvOS Simulator,name=Apple TV,OS=26.1"
platform: "tvOS Simulator,name=Apple TV,OS=26.2"

runs-on: ${{ matrix.os }}
timeout-minutes: 60
Expand Down
26 changes: 19 additions & 7 deletions Sources/LiveKit/Core/Room+Engine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ extension Room {
primary: !isSubscriberPrimary,
delegate: self)

await publisher.set { [weak self] offer in
await publisher.set { [weak self] offer, offerId in
guard let self else { return }
log("Publisher onOffer \(offer.sdp)")
try await signalClient.send(offer: offer)
log("Publisher onOffer with offerId: \(offerId), sdp: \(offer.sdp)")
try await signalClient.send(offer: offer, offerId: offerId)
}

// data over pub channel for backwards compatibility
Expand Down Expand Up @@ -318,7 +318,13 @@ extension Room {

log("[Connect] Waiting for subscriber to connect...")
// Wait for primary transport to connect (if not already)
try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout)
do {
try await primaryTransportConnectedCompleter.wait(timeout: _state.connectOptions.primaryTransportConnectTimeout)
log("[Connect] Subscriber transport connected")
} catch {
log("[Connect] Subscriber transport failed to connect, error: \(error)", .error)
throw error
}
try Task.checkCancellation()

// send SyncState before offer
Expand All @@ -330,7 +336,13 @@ extension Room {
// Only if published, wait for publisher to connect...
log("[Connect] Waiting for publisher to connect...")
try await publisher.createAndSendOffer(iceRestart: true)
try await publisherTransportConnectedCompleter.wait(timeout: _state.connectOptions.publisherTransportConnectTimeout)
do {
try await publisherTransportConnectedCompleter.wait(timeout: _state.connectOptions.publisherTransportConnectTimeout)
log("[Connect] Publisher transport connected")
} catch {
log("[Connect] Publisher transport failed to connect, error: \(error)", .error)
throw error
}
}
}

Expand Down Expand Up @@ -465,8 +477,8 @@ extension Room {
$0.subscribe = !autoSubscribe
}

try await signalClient.sendSyncState(answer: previousAnswer?.toPBType(),
offer: previousOffer?.toPBType(),
try await signalClient.sendSyncState(answer: previousAnswer?.toPBType(offerId: 0),
offer: previousOffer?.toPBType(offerId: 0),
subscription: subscription,
publishTracks: localParticipant.publishedTracksInfo(),
dataChannels: publisherDataChannel.infos(),
Expand Down
47 changes: 29 additions & 18 deletions Sources/LiveKit/Core/Room+SignalClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,32 @@ extension Room: SignalClientDelegate {
// engine is currently connected state
case .connected = _state.connectionState
{
do {
try await startReconnect(reason: .websocket)
} catch {
log("Failed calling startReconnect, error: \(error)", .error)
Task {
Copy link
Contributor Author

@pblazej pblazej Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SerialRunnerActor (inside SignalClient._delegate)
│
├─> [Task 1] didUpdateConnectionState
│   └─> await startReconnect()  ← blocking the actor
│       └─> waiting for offer...
│
└─> [Task 2] didReceiveOffer
    └─> Can't enter because actor is busy with Task 1

do {
try await startReconnect(reason: .websocket)
} catch {
log("Failed calling startReconnect, error: \(error)", .error)
}
}
}
}

func signalClient(_: SignalClient, didReceiveLeave canReconnect: Bool, reason: Livekit_DisconnectReason) async {
log("canReconnect: \(canReconnect), reason: \(reason)")
func signalClient(_: SignalClient, didReceiveLeave action: Livekit_LeaveRequest.Action, reason: Livekit_DisconnectReason) async {
log("action: \(action), reason: \(reason)")

if canReconnect {
// force .full for next reconnect
let error = LiveKitError.from(reason: reason)
switch action {
case .reconnect:
// Force .full for next reconnect
_state.mutate { $0.nextReconnectMode = .full }
} else {
// Server indicates it's not recoverable
await cleanUp(withError: LiveKitError.from(reason: reason))
fallthrough
case .resume:
// Abort current attempt
await signalClient.cleanUp(withError: error)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more or less equivalent to JS:

        case LeaveRequest_Action.RECONNECT:
          this.fullReconnectOnNext = true;
          // reconnect immediately instead of waiting for next attempt
          this.handleDisconnect(leaveReconnect);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (if-minor): maybe this is a good moment to adopt the .action of the leave request as well here? (backwards compatible of course in case it's unset/0).

We don't actually need a full reconnect on every leave request. But if it makes more sense as a follow up that also sounds good.

Copy link
Contributor Author

@pblazej pblazej Nov 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, moved to the action param here 👍

Re: backwards compatibility, I discarded the legacy canReconnect param, which mimics JS and no-op if unknown.

case .disconnect:
await cleanUp(withError: error)
default:
log("Unknown leave action: \(action), ignoring", .warning)
}
}

Expand Down Expand Up @@ -319,17 +328,19 @@ extension Room: SignalClientDelegate {
}
}

func signalClient(_: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription) async {
func signalClient(_: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription, offerId: UInt32) async {
log("Received answer for offerId: \(offerId)")

do {
let publisher = try requirePublisher()
try await publisher.set(remoteDescription: answer)
try await publisher.set(remoteDescription: answer, offerId: offerId)
} catch {
log("Failed to set remote description, error: \(error)", .error)
log("Failed to set remote description with offerId: \(offerId), error: \(error)", .error)
}
}

func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription) async {
log("Received offer, creating & sending answer...")
func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription, offerId: UInt32) async {
log("Received offer with offerId: \(offerId), creating & sending answer...")

guard let subscriber = _state.subscriber else {
log("Failed to send answer, subscriber is nil", .error)
Expand All @@ -340,9 +351,9 @@ extension Room: SignalClientDelegate {
try await subscriber.set(remoteDescription: offer)
let answer = try await subscriber.createAnswer()
try await subscriber.set(localDescription: answer)
try await signalClient.send(answer: answer)
try await signalClient.send(answer: answer, offerId: offerId)
} catch {
log("Failed to send answer with error: \(error)", .error)
log("Failed to send answer for offerId: \(offerId), error: \(error)", .error)
}
}

Expand Down
24 changes: 14 additions & 10 deletions Sources/LiveKit/Core/SignalClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,15 @@ actor SignalClient: Loggable {
participantSid: participantSid,
adaptiveStream: adaptiveStream)

if reconnectMode != nil {
log("[Connect] with url: \(url)")
let isReconnect = reconnectMode != nil

if isReconnect {
log("Reconnecting with url: \(url)")
} else {
log("Connecting with url: \(url)")
}

_state.mutate { $0.connectionState = (reconnectMode != nil ? .reconnecting : .connecting) }
_state.mutate { $0.connectionState = (isReconnect ? .reconnecting : .connecting) }

do {
let socket = try await WebSocket(url: url,
Expand Down Expand Up @@ -279,10 +281,12 @@ private extension SignalClient {
await _restartPingTimer()

case let .answer(sd):
_delegate.notifyDetached { await $0.signalClient(self, didReceiveAnswer: sd.toRTCType()) }
let (rtcDescription, offerId) = sd.toRTCType()
_delegate.notifyDetached { await $0.signalClient(self, didReceiveAnswer: rtcDescription, offerId: offerId) }

case let .offer(sd):
_delegate.notifyDetached { await $0.signalClient(self, didReceiveOffer: sd.toRTCType()) }
let (rtcDescription, offerId) = sd.toRTCType()
_delegate.notifyDetached { await $0.signalClient(self, didReceiveOffer: rtcDescription, offerId: offerId) }

case let .trickle(trickle):
guard let rtcCandidate = try? RTC.createIceCandidate(fromJsonString: trickle.candidateInit) else {
Expand Down Expand Up @@ -315,7 +319,7 @@ private extension SignalClient {
_delegate.notifyDetached { await $0.signalClient(self, didUpdateRemoteMute: Track.Sid(from: mute.sid), muted: mute.muted) }

case let .leave(leave):
_delegate.notifyDetached { await $0.signalClient(self, didReceiveLeave: leave.canReconnect, reason: leave.reason) }
_delegate.notifyDetached { await $0.signalClient(self, didReceiveLeave: leave.action, reason: leave.reason) }

case let .streamStateUpdate(states):
_delegate.notifyDetached { await $0.signalClient(self, didUpdateTrackStreamStates: states.streamStates) }
Expand Down Expand Up @@ -358,17 +362,17 @@ extension SignalClient {
// MARK: - Send methods

extension SignalClient {
func send(offer: LKRTCSessionDescription) async throws {
func send(offer: LKRTCSessionDescription, offerId: UInt32) async throws {
let r = Livekit_SignalRequest.with {
$0.offer = offer.toPBType()
$0.offer = offer.toPBType(offerId: offerId)
}

try await _sendRequest(r)
}

func send(answer: LKRTCSessionDescription) async throws {
func send(answer: LKRTCSessionDescription, offerId: UInt32) async throws {
let r = Livekit_SignalRequest.with {
$0.answer = answer.toPBType()
$0.answer = answer.toPBType(offerId: offerId)
}

try await _sendRequest(r)
Expand Down
21 changes: 19 additions & 2 deletions Sources/LiveKit/Core/Transport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ internal import LiveKitWebRTC
actor Transport: NSObject, Loggable {
// MARK: - Types

typealias OnOfferBlock = @Sendable (LKRTCSessionDescription) async throws -> Void
typealias OnOfferBlock = @Sendable (LKRTCSessionDescription, UInt32) async throws -> Void

// MARK: - Public

Expand Down Expand Up @@ -56,6 +56,7 @@ actor Transport: NSObject, Loggable {
private var _reNegotiate: Bool = false
private var _onOffer: OnOfferBlock?
private var _isRestartingIce: Bool = false
private var _latestOfferId: UInt32 = 0

// forbid direct access to PeerConnection
private let _pc: LKRTCPeerConnection
Expand Down Expand Up @@ -110,6 +111,20 @@ actor Transport: NSObject, Loggable {
await _iceCandidatesQueue.process(candidate, if: remoteDescription != nil && !_isRestartingIce)
}

func set(remoteDescription sd: LKRTCSessionDescription, offerId: UInt32) async throws {
if signalingState != .haveLocalOffer {
log("Received answer with unexpected signaling state: \(signalingState), expected .haveLocalOffer", .warning)
}

if offerId == 0 {
log("Skipping validation for legacy server (missing offerId), latestOfferId: \(_latestOfferId)", .warning)
} else if offerId != _latestOfferId {
throw LiveKitError(.invalidState, message: "OfferId mismatch, expected \(_latestOfferId) but got \(offerId)")
}

try await set(remoteDescription: sd)
}

func set(remoteDescription sd: LKRTCSessionDescription) async throws {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<Void, Error>) in
_pc.setRemoteDescription(sd) { error in
Expand Down Expand Up @@ -157,12 +172,14 @@ actor Transport: NSObject, Loggable {

// Actually negotiate
func _negotiateSequence() async throws {
_latestOfferId += 1
let offer = try await createOffer(for: constraints)
try await set(localDescription: offer)
try await _onOffer(offer)
try await _onOffer(offer, _latestOfferId)
}

if signalingState == .haveLocalOffer, iceRestart, let sd = remoteDescription {
_reNegotiate = false // Clear flag to prevent double offer
Copy link
Contributor Author

@pblazej pblazej Nov 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JS around https://github.com/livekit/client-sdk-js/blob/65e0bfe38ba594bbfe73de60ce72dbc7b96be3a2/src/room/PCTransport.ts#L271

if (this._pc && this._pc.signalingState === 'have-local-offer') {
    const currentSD = this._pc.remoteDescription;
    if (options?.iceRestart && currentSD) {
        // 1. Rollback: Sets remote description
        await this._pc.setRemoteDescription(currentSD);
        // 2. DOES NOT set this.renegotiate = true
        // 3. Falls through to create offer (line 291)
    } else {
        // 1. Defer: Sets renegotiate = true
        this.renegotiate = true;
        // 2. Returns immediately (skips offer creation)
        return;
    }
}

vs Swift

if signalingState == .haveLocalOffer {
    if !(iceRestart && remoteDescription != nil) {
        // 1. Defer: Sets _reNegotiate = true
        _reNegotiate = true
        // 2. Returns immediately
        return
    }
    
    // Else: ICE Restart path falls through...
}

// ... (offer ID increment) ...

if signalingState == .haveLocalOffer, iceRestart, let sd = remoteDescription {
    // 1. Force clear _reNegotiate (Matches JS not setting it)
    _reNegotiate = false  
    // 2. Rollback: Sets remote description
    try await set(remoteDescription: sd)
    // 3. Creates new offer immediately
    return try await _negotiateSequence()
}

try await set(remoteDescription: sd)
return try await _negotiateSequence()
}
Expand Down
6 changes: 3 additions & 3 deletions Sources/LiveKit/Protocols/SignalClientDelegate.swift
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ internal import LiveKitWebRTC
protocol SignalClientDelegate: AnyObject, Sendable {
func signalClient(_ signalClient: SignalClient, didUpdateConnectionState newState: ConnectionState, oldState: ConnectionState, disconnectError: LiveKitError?) async
func signalClient(_ signalClient: SignalClient, didReceiveConnectResponse connectResponse: SignalClient.ConnectResponse) async
func signalClient(_ signalClient: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription) async
func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription) async
func signalClient(_ signalClient: SignalClient, didReceiveAnswer answer: LKRTCSessionDescription, offerId: UInt32) async
func signalClient(_ signalClient: SignalClient, didReceiveOffer offer: LKRTCSessionDescription, offerId: UInt32) async
func signalClient(_ signalClient: SignalClient, didReceiveIceCandidate iceCandidate: IceCandidate, target: Livekit_SignalTarget) async
func signalClient(_ signalClient: SignalClient, didUnpublishLocalTrack localTrack: Livekit_TrackUnpublishedResponse) async
func signalClient(_ signalClient: SignalClient, didUpdateParticipants participants: [Livekit_ParticipantInfo]) async
Expand All @@ -34,6 +34,6 @@ protocol SignalClientDelegate: AnyObject, Sendable {
func signalClient(_ signalClient: SignalClient, didUpdateSubscribedCodecs codecs: [Livekit_SubscribedCodec], qualities: [Livekit_SubscribedQuality], forTrackSid sid: String) async
func signalClient(_ signalClient: SignalClient, didUpdateSubscriptionPermission permission: Livekit_SubscriptionPermissionUpdate) async
func signalClient(_ signalClient: SignalClient, didUpdateToken token: String) async
func signalClient(_ signalClient: SignalClient, didReceiveLeave canReconnect: Bool, reason: Livekit_DisconnectReason) async
func signalClient(_ signalClient: SignalClient, didReceiveLeave action: Livekit_LeaveRequest.Action, reason: Livekit_DisconnectReason) async
func signalClient(_ signalClient: SignalClient, didSubscribeTrack trackSid: Track.Sid) async
}
7 changes: 4 additions & 3 deletions Sources/LiveKit/Types/SessionDescription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@
internal import LiveKitWebRTC

extension LKRTCSessionDescription {
func toPBType() -> Livekit_SessionDescription {
func toPBType(offerId: UInt32) -> Livekit_SessionDescription {
var sd = Livekit_SessionDescription()
sd.sdp = sdp
sd.id = offerId

switch type {
case .answer: sd.type = "answer"
Expand All @@ -33,7 +34,7 @@ extension LKRTCSessionDescription {
}

extension Livekit_SessionDescription {
func toRTCType() -> LKRTCSessionDescription {
func toRTCType() -> (LKRTCSessionDescription, UInt32) {
var sdpType: LKRTCSdpType
switch type {
case "answer": sdpType = .answer
Expand All @@ -42,6 +43,6 @@ extension Livekit_SessionDescription {
default: fatalError("Unknown state \(type)") // This should never happen
}

return RTC.createSessionDescription(type: sdpType, sdp: sdp)
return (RTC.createSessionDescription(type: sdpType, sdp: sdp), id)
}
}
Loading