Skip to content

Commit

Permalink
Remove processorToPipe from interop.flow package object
Browse files Browse the repository at this point in the history
  • Loading branch information
BalmungSan committed Nov 25, 2024
1 parent 771e658 commit f4b825d
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 30 deletions.
33 changes: 33 additions & 0 deletions core/shared/src/main/scala/fs2/fs2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

import java.util.concurrent.Flow.Processor
import cats.effect.Async

package object fs2 {

/** A stream transformation represented as a function from stream to stream.
Expand All @@ -27,6 +30,36 @@ package object fs2 {
*/
type Pipe[F[_], -I, +O] = Stream[F, I] => Stream[F, O]

object Pipe {
final class FromProcessorPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal {
def apply[I, O](
processor: Processor[I, O],
chunkSize: Int
)(implicit
F: Async[F]
): Pipe[F, I, O] =
new interop.flow.ProcessorPipe(processor, chunkSize)
}

/** Creates a [[Pipe]] from the given [[Processor]].
*
* The input stream won't be consumed until you request elements from the output stream,
* and thus the processor is not initiated until then.
*
* @note The [[Pipe]] can be reused multiple times as long as the [[Processor]] can be reused.
* Each invocation of the pipe will create and manage its own internal [[Publisher]] and [[Subscriber]],
* and use them to subscribe to and from the [[Processor]] respectively.
*
* @param [[processor]] the [[Processor]] that represents the [[Pipe]] logic.
* @param chunkSize setup the number of elements asked each time from the upstream [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
* A high number will also lead to more elements in memory.
*/
def fromProcessor[F[_]]: FromProcessorPartiallyApplied[F] =
new FromProcessorPartiallyApplied[F](dummy = true)
}

/** A stream transformation that combines two streams in to a single stream,
* represented as a function from two streams to a single stream.
*
Expand Down
18 changes: 0 additions & 18 deletions core/shared/src/main/scala/fs2/interop/flow/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -198,24 +198,6 @@ package object flow {
): Stream[F, Nothing] =
StreamSubscription.subscribe(stream, subscriber)

/** Creates a [[Pipe]] from the given [[Processor]]
*
* The input stream won't be consumed until you request elements from the output stream,
* and thus the processor is not initiated until then.
*
* @note The [[Pipe]] can be reused multiple times as long as the [[Processor]] can be reused.
* Each invocation of the pipe will create and manage its own internal [[Publisher]] and [[Subscriber]],
* and use them to subscribe to and from the [[Processor]] respectively.
*
* @param [[processor]] the [[Processor]] that represents the [[Pipe]] logic.
* @param chunkSize setup the number of elements asked each time from the upstream [[Publisher]].
* A high number may be useful if the publisher is triggering from IO,
* like requesting elements from a database.
* A high number will also lead to more elements in memory.
*/
def processorToPipe[F[_]]: syntax.FromProcessorPartiallyApplied[F] =
new syntax.FromProcessorPartiallyApplied[F](dummy = true)

/** A default value for the `chunkSize` argument,
* that may be used in the absence of other constraints;
* we encourage choosing an appropriate value consciously.
Expand Down
13 changes: 2 additions & 11 deletions core/shared/src/main/scala/fs2/interop/flow/syntax.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ package flow

import cats.effect.{Async, Resource}

import java.util.concurrent.Flow.{Processor, Publisher, Subscriber}
import java.util.concurrent.Flow.{Publisher, Subscriber}

object syntax {
implicit final class PublisherOps[A](private val publisher: Publisher[A]) extends AnyVal {
Expand All @@ -46,6 +46,7 @@ object syntax {
flow.subscribeStream(stream, subscriber)
}

// TODO: Move to the Stream companion object when removing the deprecated flow package object and syntax.
final class FromPublisherPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal {
def apply[A](
publisher: Publisher[A],
Expand All @@ -57,14 +58,4 @@ object syntax {
F.delay(publisher.subscribe(subscriber))
}
}

final class FromProcessorPartiallyApplied[F[_]](private val dummy: Boolean) extends AnyVal {
def apply[I, O](
processor: Processor[I, O],
chunkSize: Int
)(implicit
F: Async[F]
): Pipe[F, I, O] =
new ProcessorPipe(processor, chunkSize)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ final class ProcessorPipeSpec extends Fs2Suite {
chunkSize = bufferSize
)

val pipe = processorToPipe[IO](
val pipe = Pipe.fromProcessor[IO](
processor,
chunkSize = bufferSize
)
Expand Down

0 comments on commit f4b825d

Please sign in to comment.