diff --git a/stdlib/public/Concurrency/AsyncStream.swift b/stdlib/public/Concurrency/AsyncStream.swift index 7571e6b8aba64..18187a7682ba7 100644 --- a/stdlib/public/Concurrency/AsyncStream.swift +++ b/stdlib/public/Concurrency/AsyncStream.swift @@ -428,3 +428,6 @@ extension AsyncStream.Continuation { return storage.yield(()) } } + +@available(SwiftStdlib 5.1, *) +extension AsyncStream: @unchecked Sendable where Element: Sendable { } diff --git a/stdlib/public/Concurrency/AsyncStreamBuffer.swift b/stdlib/public/Concurrency/AsyncStreamBuffer.swift index 41aed33a72637..7aaae004ce622 100644 --- a/stdlib/public/Concurrency/AsyncStreamBuffer.swift +++ b/stdlib/public/Concurrency/AsyncStreamBuffer.swift @@ -17,7 +17,7 @@ import Swift import Darwin func _lockWordCount() -> Int { - let sz = + let sz = MemoryLayout.size / MemoryLayout.size return max(sz, 1) } @@ -57,7 +57,7 @@ extension AsyncStream { typealias TerminationHandler = @Sendable (Continuation.Termination) -> Void struct State { - var continuation: UnsafeContinuation? + var continuations = [UnsafeContinuation]() var pending = _Deque() let limit: Continuation.BufferingPolicy var onTermination: TerminationHandler? @@ -105,7 +105,7 @@ extension AsyncStream { } } - func cancel() { + @Sendable func cancel() { lock() // swap out the handler before we invoke it to prevent double cancel let handler = state.onTermination @@ -123,7 +123,9 @@ extension AsyncStream { lock() let limit = state.limit let count = state.pending.count - if let continuation = state.continuation { + + if !state.continuations.isEmpty { + let continuation = state.continuations.removeFirst() if count > 0 { if !state.terminal { switch limit { @@ -151,17 +153,14 @@ extension AsyncStream { } else { result = .terminated } - state.continuation = nil let toSend = state.pending.removeFirst() unlock() continuation.resume(returning: toSend) } else if state.terminal { - state.continuation = nil result = .terminated unlock() continuation.resume(returning: nil) } else { - state.continuation = nil switch limit { case .unbounded: result = .enqueued(remaining: .max) @@ -212,15 +211,15 @@ extension AsyncStream { state.onTermination = nil state.terminal = true - if let continuation = state.continuation { + if let continuation = state.continuations.first { if state.pending.count > 0 { - state.continuation = nil + state.continuations.removeFirst() let toSend = state.pending.removeFirst() unlock() handler?(.finished) continuation.resume(returning: toSend) } else if state.terminal { - state.continuation = nil + state.continuations.removeFirst() unlock() handler?(.finished) continuation.resume(returning: nil) @@ -236,22 +235,20 @@ extension AsyncStream { func next(_ continuation: UnsafeContinuation) { lock() - if state.continuation == nil { - if state.pending.count > 0 { - let toSend = state.pending.removeFirst() - unlock() - continuation.resume(returning: toSend) - } else if state.terminal { - unlock() - continuation.resume(returning: nil) - } else { - state.continuation = continuation - unlock() - } + state.continuations.append(continuation) + if state.pending.count > 0 { + let cont = state.continuations.removeFirst() + let toSend = state.pending.removeFirst() + unlock() + cont.resume(returning: toSend) + } else if state.terminal { + let cont = state.continuations.removeFirst() + unlock() + cont.resume(returning: nil) } else { unlock() - fatalError("attempt to await next() on more than one task") } + } func next() async -> Element? { @@ -341,7 +338,7 @@ extension AsyncThrowingStream { } } - func cancel() { + @Sendable func cancel() { lock() // swap out the handler before we invoke it to prevent double cancel let handler = state.onTermination @@ -595,3 +592,4 @@ final class _AsyncStreamCriticalStorage: @unchecked Sendable { return storage } } + diff --git a/stdlib/public/Concurrency/AsyncThrowingStream.swift b/stdlib/public/Concurrency/AsyncThrowingStream.swift index a1bc39ce1c220..6371f7fa22938 100644 --- a/stdlib/public/Concurrency/AsyncThrowingStream.swift +++ b/stdlib/public/Concurrency/AsyncThrowingStream.swift @@ -471,3 +471,6 @@ extension AsyncThrowingStream.Continuation { storage.yield(()) } } + +@available(SwiftStdlib 5.1, *) +extension AsyncThrowingStream: @unchecked Sendable where Element: Sendable { }