Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[stdlib]: better align Async{Throwing}Stream implementations #76571

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions stdlib/public/Concurrency/AsyncStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -356,11 +356,6 @@ public struct AsyncStream<Element> {
@available(SwiftStdlib 5.1, *)
extension AsyncStream: AsyncSequence {
/// The asynchronous iterator for iterating an asynchronous stream.
///
/// This type doesn't conform to `Sendable`. Don't use it from multiple
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the 'this type isn't Sendable' assertion is still accurate, but i'm not sure it's particularly relevant or helpful to call out. the other warnings and references to fatalError() are no longer correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd instead add why it's not sendable. People sometimes ask about it, we should explain that so they don't assume it's just because it's "missing"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

excellent point, thank you.

we should explain that so they don't assume it's just because it's "missing"

this is, in fact, my current assumption. i presume it's currently not Sendable mainly due to path-dependence/history – the original implementations intentionally did not support concurrent consumption, but then that restriction was lifted. the implementation of the relevant types appear to be effectively Sendable (basically everything gets forwarded from the public API into the various internal 'storage' types which appear generally thread-safe).

/// concurrent contexts. It is a programmer error to invoke `next()` from a
/// concurrent context that contends with another such call, which
/// results in a call to `fatalError()`.
public struct Iterator: AsyncIteratorProtocol {
let context: _Context

Expand All @@ -369,10 +364,6 @@ extension AsyncStream: AsyncSequence {
/// When `next()` returns `nil`, this signifies the end of the
/// `AsyncStream`.
///
/// It is a programmer error to invoke `next()` from a
/// concurrent context that contends with another such call, which
/// results in a call to `fatalError()`.
///
/// If you cancel the task this iterator is running in while `next()` is
/// awaiting a value, the `AsyncStream` terminates. In this case, `next()`
/// might return `nil` immediately, or return `nil` on subsequent calls.
Expand All @@ -385,10 +376,6 @@ extension AsyncStream: AsyncSequence {
/// When `next()` returns `nil`, this signifies the end of the
/// `AsyncStream`.
///
/// It is a programmer error to invoke `next()` from a concurrent
/// context that contends with another such call, which results in a call to
/// `fatalError()`.
///
/// If you cancel the task this iterator is running in while `next()`
/// is awaiting a value, the `AsyncStream` terminates. In this case,
/// `next()` might return `nil` immediately, or return `nil` on
Expand Down
179 changes: 67 additions & 112 deletions stdlib/public/Concurrency/AsyncStreamBuffer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -120,57 +120,32 @@ extension AsyncStream {
}

func yield(_ value: __owned Element) -> Continuation.YieldResult {
var result: Continuation.YieldResult
let result: Continuation.YieldResult
lock()
let limit = state.limit
let count = state.pending.count

if !state.continuations.isEmpty {
// Presence of continuations implies no pending elements.
// TODO: which assertion flavor should be used?
assert(
state.pending.isEmpty,
"Continuations should imply no pending elements."
)
Comment on lines +129 to +134
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would appreciate feedback on a reasonable way to enforce this invariant. per the docs regarding assertion flavors in the stdlib, i would be inclined to think that _internalInvariant() is maybe the most appropriate function to use here. however, that symbol isn't visible in this context since this code resides in the Concurrency module. any thoughts on this, or references to prior art would be appreciated!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically assert is this is ok to not check in a released toolchain, and precondition if it should keep getting checked. Don't worry too much about the internal versions. I'm not sure the message explains the issue very well, maybe make it more descriptive why this is a crash

let continuation = state.continuations.removeFirst()
if count > 0 {
if !state.terminal {
switch limit {
case .unbounded:
state.pending.append(value)
result = .enqueued(remaining: .max)
case .bufferingOldest(let limit):
if count < limit {
state.pending.append(value)
result = .enqueued(remaining: limit - (count + 1))
} else {
result = .dropped(value)
}
case .bufferingNewest(let limit):
if count < limit {
state.pending.append(value)
result = .enqueued(remaining: limit - (count + 1))
} else if count > 0 {
result = .dropped(state.pending.removeFirst())
state.pending.append(value)
} else {
result = .dropped(value)
}
}
} else {
result = .terminated
}
let toSend = state.pending.removeFirst()
unlock()
continuation.resume(returning: toSend)
} else if state.terminal {
if state.terminal {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

per @FranzBusch's prior observation, the branches in which we were handling cases that implied both pending elements and outstanding continuations should in fact be dead. the rationale is that elements should only be buffered if there are no awaiting consumers, and continuations are stored only if there are no pending elements. sanity checks appreciated regarding this analysis, however.

result = .terminated
unlock()
continuation.resume(returning: nil)
} else {
switch limit {
case .unbounded:
result = .enqueued(remaining: .max)
case .bufferingNewest(let limit):
result = .enqueued(remaining: limit)
case .bufferingOldest(let limit):
result = .enqueued(remaining: limit)
case .bufferingNewest(let limit):
result = .enqueued(remaining: limit)
}

unlock()
continuation.resume(returning: value)
}
Expand Down Expand Up @@ -218,6 +193,11 @@ extension AsyncStream {
return
}

assert(
state.pending.isEmpty,
"Continuations should imply no pending elements."
)

// Hold on to the continuations to resume outside the lock.
let continuations = state.continuations
state.continuations.removeAll()
Expand Down Expand Up @@ -287,7 +267,7 @@ extension AsyncThrowingStream {
}

struct State {
var continuation: UnsafeContinuation<Element?, Error>?
var continuations = [UnsafeContinuation<Element?, Error>]()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

primary behavioral changes to make AsyncThrowingStream better match AsyncStream's existing behavior start here.

var pending = _Deque<Element>()
let limit: Continuation.BufferingPolicy
var onTermination: TerminationHandler?
Expand Down Expand Up @@ -349,45 +329,19 @@ extension AsyncThrowingStream {
}

func yield(_ value: __owned Element) -> Continuation.YieldResult {
var result: Continuation.YieldResult
let result: Continuation.YieldResult
lock()
let limit = state.limit
let count = state.pending.count
if let continuation = state.continuation {
if count > 0 {
if state.terminal == nil {
switch limit {
case .unbounded:
result = .enqueued(remaining: .max)
state.pending.append(value)
case .bufferingOldest(let limit):
if count < limit {
result = .enqueued(remaining: limit - (count + 1))
state.pending.append(value)
} else {
result = .dropped(value)
}
case .bufferingNewest(let limit):
if count < limit {
state.pending.append(value)
result = .enqueued(remaining: limit - (count + 1))
} else if count > 0 {
result = .dropped(state.pending.removeFirst())
state.pending.append(value)
} else {
result = .dropped(value)
}
}
} else {
result = .terminated
}
state.continuation = nil
let toSend = state.pending.removeFirst()
unlock()
continuation.resume(returning: toSend)
} else if let terminal = state.terminal {

if !state.continuations.isEmpty {
assert(
state.pending.isEmpty,
"Continuations should imply no pending elements."
)
let continuation = state.continuations.removeFirst()
if let terminal = state.terminal {
result = .terminated
state.continuation = nil
state.terminal = .finished
unlock()
switch terminal {
Expand All @@ -405,8 +359,6 @@ extension AsyncThrowingStream {
case .bufferingNewest(let limit):
result = .enqueued(remaining: limit)
}

state.continuation = nil
unlock()
continuation.resume(returning: value)
}
Expand Down Expand Up @@ -446,64 +398,67 @@ extension AsyncThrowingStream {
lock()
let handler = state.onTermination
state.onTermination = nil

let terminal: Terminal
if let failure = error {
terminal = .failed(failure)
} else {
terminal = .finished
}

// Update terminal state if needed
if state.terminal == nil {
if let failure = error {
state.terminal = .failed(failure)
} else {
state.terminal = .finished
}
state.terminal = terminal
}

if let continuation = state.continuation {
if state.pending.count > 0 {
state.continuation = nil
let toSend = state.pending.removeFirst()
unlock()
handler?(.finished(error))
continuation.resume(returning: toSend)
} else if let terminal = state.terminal {
state.continuation = nil
guard !state.continuations.isEmpty else {
unlock()
handler?(.finished(error))
return
}

assert(
state.pending.isEmpty,
"Continuations should imply no pending elements."
)

// Hold on to the continuations to resume outside the lock.
let continuations = state.continuations
state.continuations.removeAll()

unlock()
handler?(.finished(error))

for continuation in continuations {
switch terminal {
case .finished:
continuation.resume(returning: nil)
case .failed(let error):
continuation.resume(throwing: error)
}
} else {
unlock()
handler?(.finished(error))
}
} else {
unlock()
handler?(.finished(error))
}
}

func next(_ continuation: UnsafeContinuation<Element?, Error>) {
lock()
if state.continuation == nil {
if state.pending.count > 0 {
let toSend = state.pending.removeFirst()
unlock()
continuation.resume(returning: toSend)
} else if let terminal = state.terminal {
state.terminal = .finished
unlock()
switch terminal {
case .finished:
continuation.resume(returning: nil)
case .failed(let error):
continuation.resume(throwing: error)
}
} else {
state.continuation = continuation
unlock()
state.continuations.append(continuation)
if state.pending.count > 0 {
let cont = state.continuations.removeFirst()
let toSend = state.pending.removeFirst()
unlock()
cont.resume(returning: toSend)
} else if let terminal = state.terminal {
state.terminal = .finished
let cont = state.continuations.removeFirst()
unlock()
switch terminal {
case .finished:
cont.resume(returning: nil)
case .failed(let error):
cont.resume(throwing: error)
}
} else {
unlock()
fatalError("attempt to await next() on more than one task")
}
}

Expand Down
13 changes: 0 additions & 13 deletions stdlib/public/Concurrency/AsyncThrowingStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,6 @@ public struct AsyncThrowingStream<Element, Failure: Error> {
@available(SwiftStdlib 5.1, *)
extension AsyncThrowingStream: AsyncSequence {
/// The asynchronous iterator for iterating an asynchronous stream.
///
/// This type is not `Sendable`. Don't use it from multiple
/// concurrent contexts. It is a programmer error to invoke `next()` from a
/// concurrent context that contends with another such call, which
/// results in a call to `fatalError()`.
public struct Iterator: AsyncIteratorProtocol {
let context: _Context

Expand All @@ -405,10 +400,6 @@ extension AsyncThrowingStream: AsyncSequence {
/// When `next()` returns `nil`, this signifies the end of the
/// `AsyncThrowingStream`.
///
/// It is a programmer error to invoke `next()` from a concurrent context
/// that contends with another such call, which results in a call to
/// `fatalError()`.
///
/// If you cancel the task this iterator is running in while `next()` is
/// awaiting a value, the `AsyncThrowingStream` terminates. In this case,
/// `next()` may return `nil` immediately, or else return `nil` on
Expand All @@ -422,10 +413,6 @@ extension AsyncThrowingStream: AsyncSequence {
/// When `next()` returns `nil`, this signifies the end of the
/// `AsyncThrowingStream`.
///
/// It is a programmer error to invoke `next()` from a concurrent
/// context that contends with another such call, which results in a call to
/// `fatalError()`.
///
/// If you cancel the task this iterator is running in while `next()`
/// is awaiting a value, the `AsyncThrowingStream` terminates. In this case,
/// `next()` may return `nil` immediately, or else return `nil` on
Expand Down
Loading