Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subscribable is to observable as _what_ is to enumerable? #131

Open
NickDarvey opened this issue Jan 23, 2023 · 1 comment
Open

Subscribable is to observable as _what_ is to enumerable? #131

NickDarvey opened this issue Jan 23, 2023 · 1 comment

Comments

@NickDarvey
Copy link

Subscribable.Subscribe returns a subscription which gives us a way to traverse the computation graph of our subscribables. What would be the equivalent for enumerables?

I know I'm supposed to 'flip the arrows' to figure this out, but I'm struggling. @bartdesmet are you around to walk me through this?

(For context, I'd love the flexibility to go push-to-pull in a processing pipeline. I'm trying to understand what this new enumerable interface might look like where I could implement the ability to walk the operator graph, across the push-to-pull boundary, for things like checkpointing.)

@idg10
Copy link
Contributor

idg10 commented Jul 7, 2023

The simplest way to look at this might be to consider plain Rx—the IObservable<T> and IObserver<T> interfaces built into the .NET runtime libraries. Those were the original incarnation of the push dual of IEnumerable<T>/IEnumerator<T>.

Calling Subscribe on an IObservable<T> means you want to start receiving items from it, so its counterpart is calling GetEnumerator on an IEnumerable<T>.

Calling Dispose on the IDisposable returned by IObservable<T> indicates that you don't want to continue to receive items from the source, regardless of whether the source has more to give. In the pull world, we indicate this by calling Dispose on the IEnumerable<T>.

So there's a sense in which the IEnumerator<T> is the equivalent of the object returned from Subscribe: each individual consumer of events gets one of these; if multiple consumers are retrieving items from a source concurrently (where supported) each will have its own IEnumerator<T> in the pull world, and each will have its own object returned from Subscribe in the push world.

So then moving back to the Reaqtive world, with its augmented concept of subscription, the subscription object returned by IObservable<T>.Subscribe corresponds to the IEnumerator<T>.

This might sound surprising because people often talk about the IObserver<T> being equivalent to the IEnumerator<T>. It is, and it isn't. There's a catch here: when we flip the arrows, one of them doesn't get flipped:

Operation Initiator with IEnumerable Initiator with IObservable
Receive next item/error/end Consumer Source
Terminate before reaching end Consumer Consumer

So when going from pull to push, we go from the consumer fetching items to the source pushing items. But when it comes to deciding to stop early, that's a consumer-driven decision in both worlds. Even in Rx's push-based world, it's still the consumer of events that gets to decide that they want to stop early.

This leads to an asymmetry in the design. In the pull world, both the receive next and the terminate operations are initiated by the consumer invoking a method on the IEnumerator<T> (MoveNext and Dispose respectively). But in the pull world, receive next involves the source calling a method on the consumer, but terminate involves the consumer calling a method on the source.

So in the pull world, there needs to be a bidirectional association between source and consumer. The source needs to be able to invoke the consumer's OnNext, OnCompleted and OnError, so the consumer must supply an IObserver<T>, but the consumer needs to be able to call Dispose on the source to terminate early, which is why IObservable<T>.Subscribe returns an object. You have what is conceptually a single thing—a subscription—but it is manifest as a connected pair of objects (the IObserver<T> and the subscription) because of the need for bidirectional communication.

The equivalent concept in IEnumerable<T> doesn't need a pair of objects because all the method invocations go in one direction: from consumer to source. The consumer doesn't need to supply an object to the source because the source never calls methods on the consumer. So instead of the pair of objects—IObserver<T> and subscription—the pull world represents the same concept with just one object, IEnumerator<T>.

But what about traversal of the computation graph? Well that's the main reason I said to look at the original Rx version: subscriptions in that world didn't provide such a service, which was exactly consistent with the fact that enumerables don't offer a way to do that either. Reaqtive makes subscriptions slightly more powerful, with this ability to walk the computation graph. But there is no equivalent of this in the pull world. You could imagine some augmented alternative to IEnumerator<T> that did offer such a thing, as the dual of this Reaqtive feature. But Reaqtive doesn't define any such thing for the pull world in practice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants