From 8d9f46c412d70b553944f7e40d2b3b6e8863454b Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Mon, 7 Mar 2022 16:45:19 -0800 Subject: [PATCH 1/2] Enable Sendability for AsyncStream and AsyncThrowingStream --- stdlib/public/Concurrency/AsyncStream.swift | 3 ++ .../Concurrency/AsyncStreamBuffer.swift | 48 +++++++++---------- .../Concurrency/AsyncThrowingStream.swift | 3 ++ 3 files changed, 30 insertions(+), 24 deletions(-) diff --git a/stdlib/public/Concurrency/AsyncStream.swift b/stdlib/public/Concurrency/AsyncStream.swift index 7571e6b8aba64..18187a7682ba7 100644 --- a/stdlib/public/Concurrency/AsyncStream.swift +++ b/stdlib/public/Concurrency/AsyncStream.swift @@ -428,3 +428,6 @@ extension AsyncStream.Continuation { return storage.yield(()) } } + +@available(SwiftStdlib 5.1, *) +extension AsyncStream: @unchecked Sendable where Element: Sendable { } diff --git a/stdlib/public/Concurrency/AsyncStreamBuffer.swift b/stdlib/public/Concurrency/AsyncStreamBuffer.swift index 41aed33a72637..ce62a012455bd 100644 --- a/stdlib/public/Concurrency/AsyncStreamBuffer.swift +++ b/stdlib/public/Concurrency/AsyncStreamBuffer.swift @@ -17,7 +17,7 @@ import Swift import Darwin func _lockWordCount() -> Int { - let sz = + let sz = MemoryLayout.size / MemoryLayout.size return max(sz, 1) } @@ -57,7 +57,7 @@ extension AsyncStream { typealias TerminationHandler = @Sendable (Continuation.Termination) -> Void struct State { - var continuation: UnsafeContinuation? + var continuations = [UnsafeContinuation]() var pending = _Deque() let limit: Continuation.BufferingPolicy var onTermination: TerminationHandler? @@ -105,7 +105,7 @@ extension AsyncStream { } } - func cancel() { + @Sendable func cancel() { lock() // swap out the handler before we invoke it to prevent double cancel let handler = state.onTermination @@ -123,7 +123,8 @@ extension AsyncStream { lock() let limit = state.limit let count = state.pending.count - if let continuation = state.continuation { + + if let continuation = state.continuations.first { if count > 0 { if !state.terminal { switch limit { @@ -151,17 +152,17 @@ extension AsyncStream { } else { result = .terminated } - state.continuation = nil + state.continuations.removeFirst() let toSend = state.pending.removeFirst() unlock() continuation.resume(returning: toSend) } else if state.terminal { - state.continuation = nil + state.continuations.removeFirst() result = .terminated unlock() continuation.resume(returning: nil) } else { - state.continuation = nil + state.continuations.removeFirst() switch limit { case .unbounded: result = .enqueued(remaining: .max) @@ -212,15 +213,15 @@ extension AsyncStream { state.onTermination = nil state.terminal = true - if let continuation = state.continuation { + if let continuation = state.continuations.first { if state.pending.count > 0 { - state.continuation = nil + state.continuations.removeFirst() let toSend = state.pending.removeFirst() unlock() handler?(.finished) continuation.resume(returning: toSend) } else if state.terminal { - state.continuation = nil + state.continuations.removeFirst() unlock() handler?(.finished) continuation.resume(returning: nil) @@ -236,22 +237,20 @@ extension AsyncStream { func next(_ continuation: UnsafeContinuation) { lock() - if state.continuation == nil { - if state.pending.count > 0 { - let toSend = state.pending.removeFirst() - unlock() - continuation.resume(returning: toSend) - } else if state.terminal { - unlock() - continuation.resume(returning: nil) - } else { - state.continuation = continuation - unlock() - } + state.continuations.append(continuation) + if state.pending.count > 0 { + let cont = state.continuations.removeFirst() + let toSend = state.pending.removeFirst() + unlock() + cont.resume(returning: toSend) + } else if state.terminal { + let cont = state.continuations.removeFirst() + unlock() + cont.resume(returning: nil) } else { unlock() - fatalError("attempt to await next() on more than one task") } + } func next() async -> Element? { @@ -341,7 +340,7 @@ extension AsyncThrowingStream { } } - func cancel() { + @Sendable func cancel() { lock() // swap out the handler before we invoke it to prevent double cancel let handler = state.onTermination @@ -595,3 +594,4 @@ final class _AsyncStreamCriticalStorage: @unchecked Sendable { return storage } } + diff --git a/stdlib/public/Concurrency/AsyncThrowingStream.swift b/stdlib/public/Concurrency/AsyncThrowingStream.swift index a1bc39ce1c220..6371f7fa22938 100644 --- a/stdlib/public/Concurrency/AsyncThrowingStream.swift +++ b/stdlib/public/Concurrency/AsyncThrowingStream.swift @@ -471,3 +471,6 @@ extension AsyncThrowingStream.Continuation { storage.yield(()) } } + +@available(SwiftStdlib 5.1, *) +extension AsyncThrowingStream: @unchecked Sendable where Element: Sendable { } From a16820484791d173495172cc98d23333d3efa091 Mon Sep 17 00:00:00 2001 From: Philippe Hausler Date: Tue, 8 Mar 2022 09:43:25 -0800 Subject: [PATCH 2/2] Move removeFirst to hit all cases where the continuations are not empty --- stdlib/public/Concurrency/AsyncStreamBuffer.swift | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/stdlib/public/Concurrency/AsyncStreamBuffer.swift b/stdlib/public/Concurrency/AsyncStreamBuffer.swift index ce62a012455bd..7aaae004ce622 100644 --- a/stdlib/public/Concurrency/AsyncStreamBuffer.swift +++ b/stdlib/public/Concurrency/AsyncStreamBuffer.swift @@ -124,7 +124,8 @@ extension AsyncStream { let limit = state.limit let count = state.pending.count - if let continuation = state.continuations.first { + if !state.continuations.isEmpty { + let continuation = state.continuations.removeFirst() if count > 0 { if !state.terminal { switch limit { @@ -152,17 +153,14 @@ extension AsyncStream { } else { result = .terminated } - state.continuations.removeFirst() let toSend = state.pending.removeFirst() unlock() continuation.resume(returning: toSend) } else if state.terminal { - state.continuations.removeFirst() result = .terminated unlock() continuation.resume(returning: nil) } else { - state.continuations.removeFirst() switch limit { case .unbounded: result = .enqueued(remaining: .max)