-
Notifications
You must be signed in to change notification settings - Fork 10.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[stdlib]: better align Async{Throwing}Stream implementations #76571
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -120,57 +120,32 @@ extension AsyncStream { | |
} | ||
|
||
func yield(_ value: __owned Element) -> Continuation.YieldResult { | ||
var result: Continuation.YieldResult | ||
let result: Continuation.YieldResult | ||
lock() | ||
let limit = state.limit | ||
let count = state.pending.count | ||
|
||
if !state.continuations.isEmpty { | ||
// Presence of continuations implies no pending elements. | ||
// TODO: which assertion flavor should be used? | ||
assert( | ||
state.pending.isEmpty, | ||
"Continuations should imply no pending elements." | ||
) | ||
Comment on lines
+129
to
+134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i would appreciate feedback on a reasonable way to enforce this invariant. per the docs regarding assertion flavors in the stdlib, i would be inclined to think that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Basically assert is this is ok to not check in a released toolchain, and precondition if it should keep getting checked. Don't worry too much about the internal versions. I'm not sure the message explains the issue very well, maybe make it more descriptive why this is a crash |
||
let continuation = state.continuations.removeFirst() | ||
if count > 0 { | ||
if !state.terminal { | ||
switch limit { | ||
case .unbounded: | ||
state.pending.append(value) | ||
result = .enqueued(remaining: .max) | ||
case .bufferingOldest(let limit): | ||
if count < limit { | ||
state.pending.append(value) | ||
result = .enqueued(remaining: limit - (count + 1)) | ||
} else { | ||
result = .dropped(value) | ||
} | ||
case .bufferingNewest(let limit): | ||
if count < limit { | ||
state.pending.append(value) | ||
result = .enqueued(remaining: limit - (count + 1)) | ||
} else if count > 0 { | ||
result = .dropped(state.pending.removeFirst()) | ||
state.pending.append(value) | ||
} else { | ||
result = .dropped(value) | ||
} | ||
} | ||
} else { | ||
result = .terminated | ||
} | ||
let toSend = state.pending.removeFirst() | ||
unlock() | ||
continuation.resume(returning: toSend) | ||
} else if state.terminal { | ||
if state.terminal { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. per @FranzBusch's prior observation, the branches in which we were handling cases that implied both pending elements and outstanding continuations should in fact be dead. the rationale is that elements should only be buffered if there are no awaiting consumers, and continuations are stored only if there are no pending elements. sanity checks appreciated regarding this analysis, however. |
||
result = .terminated | ||
unlock() | ||
continuation.resume(returning: nil) | ||
} else { | ||
switch limit { | ||
case .unbounded: | ||
result = .enqueued(remaining: .max) | ||
case .bufferingNewest(let limit): | ||
result = .enqueued(remaining: limit) | ||
case .bufferingOldest(let limit): | ||
result = .enqueued(remaining: limit) | ||
case .bufferingNewest(let limit): | ||
result = .enqueued(remaining: limit) | ||
} | ||
|
||
unlock() | ||
continuation.resume(returning: value) | ||
} | ||
|
@@ -218,6 +193,11 @@ extension AsyncStream { | |
return | ||
} | ||
|
||
assert( | ||
state.pending.isEmpty, | ||
"Continuations should imply no pending elements." | ||
) | ||
|
||
// Hold on to the continuations to resume outside the lock. | ||
let continuations = state.continuations | ||
state.continuations.removeAll() | ||
|
@@ -287,7 +267,7 @@ extension AsyncThrowingStream { | |
} | ||
|
||
struct State { | ||
var continuation: UnsafeContinuation<Element?, Error>? | ||
var continuations = [UnsafeContinuation<Element?, Error>]() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. primary behavioral changes to make AsyncThrowingStream better match AsyncStream's existing behavior start here. |
||
var pending = _Deque<Element>() | ||
let limit: Continuation.BufferingPolicy | ||
var onTermination: TerminationHandler? | ||
|
@@ -349,45 +329,19 @@ extension AsyncThrowingStream { | |
} | ||
|
||
func yield(_ value: __owned Element) -> Continuation.YieldResult { | ||
var result: Continuation.YieldResult | ||
let result: Continuation.YieldResult | ||
lock() | ||
let limit = state.limit | ||
let count = state.pending.count | ||
if let continuation = state.continuation { | ||
if count > 0 { | ||
if state.terminal == nil { | ||
switch limit { | ||
case .unbounded: | ||
result = .enqueued(remaining: .max) | ||
state.pending.append(value) | ||
case .bufferingOldest(let limit): | ||
if count < limit { | ||
result = .enqueued(remaining: limit - (count + 1)) | ||
state.pending.append(value) | ||
} else { | ||
result = .dropped(value) | ||
} | ||
case .bufferingNewest(let limit): | ||
if count < limit { | ||
state.pending.append(value) | ||
result = .enqueued(remaining: limit - (count + 1)) | ||
} else if count > 0 { | ||
result = .dropped(state.pending.removeFirst()) | ||
state.pending.append(value) | ||
} else { | ||
result = .dropped(value) | ||
} | ||
} | ||
} else { | ||
result = .terminated | ||
} | ||
state.continuation = nil | ||
let toSend = state.pending.removeFirst() | ||
unlock() | ||
continuation.resume(returning: toSend) | ||
} else if let terminal = state.terminal { | ||
|
||
if !state.continuations.isEmpty { | ||
assert( | ||
state.pending.isEmpty, | ||
"Continuations should imply no pending elements." | ||
) | ||
let continuation = state.continuations.removeFirst() | ||
if let terminal = state.terminal { | ||
result = .terminated | ||
state.continuation = nil | ||
state.terminal = .finished | ||
unlock() | ||
switch terminal { | ||
|
@@ -405,8 +359,6 @@ extension AsyncThrowingStream { | |
case .bufferingNewest(let limit): | ||
result = .enqueued(remaining: limit) | ||
} | ||
|
||
state.continuation = nil | ||
unlock() | ||
continuation.resume(returning: value) | ||
} | ||
|
@@ -446,64 +398,67 @@ extension AsyncThrowingStream { | |
lock() | ||
let handler = state.onTermination | ||
state.onTermination = nil | ||
|
||
let terminal: Terminal | ||
if let failure = error { | ||
terminal = .failed(failure) | ||
} else { | ||
terminal = .finished | ||
} | ||
|
||
// Update terminal state if needed | ||
if state.terminal == nil { | ||
if let failure = error { | ||
state.terminal = .failed(failure) | ||
} else { | ||
state.terminal = .finished | ||
} | ||
state.terminal = terminal | ||
} | ||
|
||
if let continuation = state.continuation { | ||
if state.pending.count > 0 { | ||
state.continuation = nil | ||
let toSend = state.pending.removeFirst() | ||
unlock() | ||
handler?(.finished(error)) | ||
continuation.resume(returning: toSend) | ||
} else if let terminal = state.terminal { | ||
state.continuation = nil | ||
guard !state.continuations.isEmpty else { | ||
unlock() | ||
handler?(.finished(error)) | ||
return | ||
} | ||
|
||
assert( | ||
state.pending.isEmpty, | ||
"Continuations should imply no pending elements." | ||
) | ||
|
||
// Hold on to the continuations to resume outside the lock. | ||
let continuations = state.continuations | ||
state.continuations.removeAll() | ||
|
||
unlock() | ||
handler?(.finished(error)) | ||
|
||
for continuation in continuations { | ||
switch terminal { | ||
case .finished: | ||
continuation.resume(returning: nil) | ||
case .failed(let error): | ||
continuation.resume(throwing: error) | ||
} | ||
} else { | ||
unlock() | ||
handler?(.finished(error)) | ||
} | ||
} else { | ||
unlock() | ||
handler?(.finished(error)) | ||
} | ||
} | ||
|
||
func next(_ continuation: UnsafeContinuation<Element?, Error>) { | ||
lock() | ||
if state.continuation == nil { | ||
if state.pending.count > 0 { | ||
let toSend = state.pending.removeFirst() | ||
unlock() | ||
continuation.resume(returning: toSend) | ||
} else if let terminal = state.terminal { | ||
state.terminal = .finished | ||
unlock() | ||
switch terminal { | ||
case .finished: | ||
continuation.resume(returning: nil) | ||
case .failed(let error): | ||
continuation.resume(throwing: error) | ||
} | ||
} else { | ||
state.continuation = continuation | ||
unlock() | ||
state.continuations.append(continuation) | ||
if state.pending.count > 0 { | ||
let cont = state.continuations.removeFirst() | ||
let toSend = state.pending.removeFirst() | ||
unlock() | ||
cont.resume(returning: toSend) | ||
} else if let terminal = state.terminal { | ||
state.terminal = .finished | ||
let cont = state.continuations.removeFirst() | ||
unlock() | ||
switch terminal { | ||
case .finished: | ||
cont.resume(returning: nil) | ||
case .failed(let error): | ||
cont.resume(throwing: error) | ||
} | ||
} else { | ||
unlock() | ||
fatalError("attempt to await next() on more than one task") | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the 'this type isn't Sendable' assertion is still accurate, but i'm not sure it's particularly relevant or helpful to call out. the other warnings and references to fatalError() are no longer correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd instead add why it's not sendable. People sometimes ask about it, we should explain that so they don't assume it's just because it's "missing"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
excellent point, thank you.
this is, in fact, my current assumption. i presume it's currently not Sendable mainly due to path-dependence/history – the original implementations intentionally did not support concurrent consumption, but then that restriction was lifted. the implementation of the relevant types appear to be effectively Sendable (basically everything gets forwarded from the public API into the various internal 'storage' types which appear generally thread-safe).