diff --git a/core/shared/src/main/scala/fs2/fs2.scala b/core/shared/src/main/scala/fs2/fs2.scala index 8588110b32..ea43cca56e 100644 --- a/core/shared/src/main/scala/fs2/fs2.scala +++ b/core/shared/src/main/scala/fs2/fs2.scala @@ -19,6 +19,9 @@ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */ +import java.util.concurrent.Flow.Processor +import cats.effect.Async + package object fs2 { /** A stream transformation represented as a function from stream to stream. @@ -27,6 +30,36 @@ package object fs2 { */ type Pipe[F[_], -I, +O] = Stream[F, I] => Stream[F, O] + object Pipe { + final class FromProcessorPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal { + def apply[I, O]( + processor: Processor[I, O], + chunkSize: Int + )(implicit + F: Async[F] + ): Pipe[F, I, O] = + new interop.flow.ProcessorPipe(processor, chunkSize) + } + + /** Creates a [[Pipe]] from the given [[Processor]]. + * + * The input stream won't be consumed until you request elements from the output stream, + * and thus the processor is not initiated until then. + * + * @note The [[Pipe]] can be reused multiple times as long as the [[Processor]] can be reused. + * Each invocation of the pipe will create and manage its own internal [[Publisher]] and [[Subscriber]], + * and use them to subscribe to and from the [[Processor]] respectively. + * + * @param [[processor]] the [[Processor]] that represents the [[Pipe]] logic. + * @param chunkSize setup the number of elements asked each time from the upstream [[Publisher]]. + * A high number may be useful if the publisher is triggering from IO, + * like requesting elements from a database. + * A high number will also lead to more elements in memory. + */ + def fromProcessor[F[_]]: FromProcessorPartiallyApplied[F] = + new FromProcessorPartiallyApplied[F](dummy = true) + } + /** A stream transformation that combines two streams in to a single stream, * represented as a function from two streams to a single stream. * diff --git a/core/shared/src/main/scala/fs2/interop/flow/package.scala b/core/shared/src/main/scala/fs2/interop/flow/package.scala index b8988fd5ee..4fb25aec3d 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/package.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/package.scala @@ -198,24 +198,6 @@ package object flow { ): Stream[F, Nothing] = StreamSubscription.subscribe(stream, subscriber) - /** Creates a [[Pipe]] from the given [[Processor]] - * - * The input stream won't be consumed until you request elements from the output stream, - * and thus the processor is not initiated until then. - * - * @note The [[Pipe]] can be reused multiple times as long as the [[Processor]] can be reused. - * Each invocation of the pipe will create and manage its own internal [[Publisher]] and [[Subscriber]], - * and use them to subscribe to and from the [[Processor]] respectively. - * - * @param [[processor]] the [[Processor]] that represents the [[Pipe]] logic. - * @param chunkSize setup the number of elements asked each time from the upstream [[Publisher]]. - * A high number may be useful if the publisher is triggering from IO, - * like requesting elements from a database. - * A high number will also lead to more elements in memory. - */ - def processorToPipe[F[_]]: syntax.FromProcessorPartiallyApplied[F] = - new syntax.FromProcessorPartiallyApplied[F](dummy = true) - /** A default value for the `chunkSize` argument, * that may be used in the absence of other constraints; * we encourage choosing an appropriate value consciously. diff --git a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala index 4681066280..24536dbd26 100644 --- a/core/shared/src/main/scala/fs2/interop/flow/syntax.scala +++ b/core/shared/src/main/scala/fs2/interop/flow/syntax.scala @@ -25,7 +25,7 @@ package flow import cats.effect.{Async, Resource} -import java.util.concurrent.Flow.{Processor, Publisher, Subscriber} +import java.util.concurrent.Flow.{Publisher, Subscriber} object syntax { implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal { @@ -46,6 +46,7 @@ object syntax { flow.subscribeStream(stream, subscriber) } + // TODO: Move to the Stream companion object when removing the deprecated flow package object and syntax. final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal { def apply[A]( publisher: Publisher[A], @@ -57,14 +58,4 @@ object syntax { F.delay(publisher.subscribe(subscriber)) } } - - final class FromProcessorPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal { - def apply[I, O]( - processor: Processor[I, O], - chunkSize: Int - )(implicit - F: Async[F] - ): Pipe[F, I, O] = - new ProcessorPipe(processor, chunkSize) - } } diff --git a/core/shared/src/test/scala/fs2/interop/flow/ProcessorPipeSpec.scala b/core/shared/src/test/scala/fs2/interop/flow/ProcessorPipeSpec.scala index 968e5ddbce..b4ca0bb568 100644 --- a/core/shared/src/test/scala/fs2/interop/flow/ProcessorPipeSpec.scala +++ b/core/shared/src/test/scala/fs2/interop/flow/ProcessorPipeSpec.scala @@ -36,7 +36,7 @@ final class ProcessorPipeSpec extends Fs2Suite { chunkSize = bufferSize ) - val pipe = processorToPipe[IO]( + val pipe = Pipe.fromProcessor[IO]( processor, chunkSize = bufferSize )