Skip to content

Commit

Permalink
Add the overload Stream.fromPublisher
Browse files Browse the repository at this point in the history
  • Loading branch information
BalmungSan committed Nov 25, 2024
1 parent 8406c16 commit a67b91f
Showing 1 changed file with 50 additions and 2 deletions.
52 changes: 50 additions & 2 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3883,6 +3883,56 @@ object Stream extends StreamLowPriority {
await
}

/** Creates a [[Stream]] from a `subscribe` function;
* analogous to a `Publisher`, but effectual.
*
* This function is useful when you actually need to provide a subscriber to a third-party.
*
* @example {{{
* scala> import cats.effect.IO
* scala> import java.util.concurrent.Flow.{Publisher, Subscriber}
* scala>
* scala> def thirdPartyLibrary(subscriber: Subscriber[Int]): Unit = {
* | def somePublisher: Publisher[Int] = ???
* | somePublisher.subscribe(subscriber)
* | }
* scala>
* scala> // Interop with the third party library.
* scala> Stream.fromPublisher[IO, Int](chunkSize = 16) { subscriber =>
* | IO.println("Subscribing!") >>
* | IO.delay(thirdPartyLibrary(subscriber)) >>
* | IO.println("Subscribed!")
* | }
* res0: Stream[IO, Int] = Stream(..)
* }}}
*
* @note The subscribe function will not be executed until the stream is run.
*
* @see the overload that only requires a [[Publisher]].
*
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
* A high number will also lead to more elements in memory.
* The stream will not emit new element until,
* either the `Chunk` is filled or the publisher finishes.
* @param subscribe The effectual function that will be used to initiate the consumption process,
* it receives a [[Subscriber]] that should be used to subscribe to a [[Publisher]].
* The `subscribe` operation must be called exactly once.
*/
def fromPublisher[F[_], A](
chunkSize: Int
)(
subscribe: Subscriber[A] => F[Unit]
)(implicit
F: Async[F]
): Stream[F, A] =
Stream
.eval(interop.flow.StreamSubscriber[F, A](chunkSize))
.flatMap { subscriber =>
subscriber.stream(subscribe(subscriber))
}

/** Creates a [[Stream]] from a [[Publisher]].
*
* @example {{{
Expand All @@ -3900,8 +3950,6 @@ object Stream extends StreamLowPriority {
*
* @note The [[Publisher]] will not receive a [[Subscriber]] until the stream is run.
*
* @see the `toStream` extension method added to `Publisher`
*
* @param publisher The [[Publisher]] to consume.
* @param chunkSize setup the number of elements asked each time from the [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
Expand Down

0 comments on commit a67b91f

Please sign in to comment.