Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.

Commit 584fc57

Browse files
authored
Merge pull request #2 from danslapman/future-abstraction
Fixes #1
2 parents 4cbaa6a + bae18ae commit 584fc57

File tree

13 files changed

+221
-110
lines changed

13 files changed

+221
-110
lines changed

build.sbt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
name := "asyncstreams"
22

3-
version := "0.4-SNAPSHOT"
3+
version := "0.4"
44

55
scalaVersion := "2.11.8"
66

77
addCompilerPlugin("org.spire-math" %% "kind-projector" % "0.9.3")
88

99
libraryDependencies ++= Seq(
1010
"org.scalaz" %% "scalaz-core" % "7.2.8",
11+
"com.twitter" %% "util-core" % "6.41.0" % Test,
1112
"org.scalatest" %% "scalatest" % "3.0.1" % Test
1213
)

copying.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
22
Version 2, December 2004
33

4-
Copyright (C) 2016 Daniil Smirnov <[email protected]>
4+
Copyright (C) 2016 - 2017 Daniil Smirnov <[email protected]>
55

66
Everyone is permitted to copy and distribute verbatim or modified
77
copies of this license document, and changing it is allowed as long

src/main/scala/asyncstreams/AsyncStream.scala

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,74 +3,74 @@ package asyncstreams
33
import scala.annotation.unchecked.{uncheckedVariance => uV}
44
import scala.collection.GenIterable
55
import scala.collection.generic.CanBuildFrom
6-
import scala.concurrent.{ExecutionContext, Future}
76
import scala.language.higherKinds
7+
import scalaz.Monad
8+
import scalaz.syntax.monad._
89

9-
case class AsyncStream[A](data: Future[Step[A, AsyncStream[A]]]) {
10+
case class AsyncStream[F[+_]: Monad, A](data: F[Step[A, AsyncStream[F, A]]]) {
1011
import AsyncStream._
1112

12-
def foldLeft[B](start: B)(f: (B, A) => B)(implicit executor: ExecutionContext): Future[B] = {
13-
def impl(d: Future[Step[A, AsyncStream[A]]], acc: Future[B]): Future[B] =
13+
def foldLeft[B](start: B)(f: (B, A) => B): F[B] = {
14+
def impl(d: F[Step[A, AsyncStream[F, A]]], acc: F[B]): F[B] =
1415
d.flatMap {
1516
case END => acc
1617
case step => impl(step.rest.data, acc map (b => f(b, step.value)))
1718
}
1819

19-
impl(data, Future(start))
20+
impl(data, start.point[F])
2021
}
2122

22-
def to[Col[_]](implicit executor: ExecutionContext, cbf: CanBuildFrom[Nothing, A, Col[A @uV]]): Future[Col[A]] =
23+
def to[Col[_]](implicit cbf: CanBuildFrom[Nothing, A, Col[A @uV]]): F[Col[A]] =
2324
foldLeft(cbf())((col, el) => col += el).map(_.result())
2425

2526

26-
def takeWhile(p: A => Boolean)(implicit executor: ExecutionContext): AsyncStream[A] =
27-
new AsyncStream[A](data map {
27+
def takeWhile(p: A => Boolean): AsyncStream[F, A] =
28+
new AsyncStream[F, A](data map {
2829
case END => END
2930
case step if !p(step.value) => END
3031
case step => Step(step.value, step.rest.takeWhile(p))
3132
})
3233

3334

34-
def take(n: Int)(implicit executor: ExecutionContext): AsyncStream[A] =
35+
def take(n: Int): AsyncStream[F, A] =
3536
if (n <= 0) nil
3637
else AsyncStream(data.map {
3738
case END => END
3839
case p => Step(p.value, p.rest.take(n - 1))
3940
})
4041

41-
def foreach[U](f: (A) => U)(implicit executor: ExecutionContext): Future[Unit] =
42+
def foreach[U](f: (A) => U): F[Unit] =
4243
foldLeft(())((_: Unit, a: A) => {f(a); ()})
4344

44-
def foreachF[U](f: (A) => Future[U])(implicit executor: ExecutionContext): Future[Unit] =
45-
foldLeft(Future(()))((fu: Future[Unit], a: A) => fu.flatMap(_ => f(a)).map(_ => ())).flatMap(identity)
45+
def foreachF[U](f: (A) => F[U]): F[Unit] =
46+
foldLeft(().point[F])((fu: F[Unit], a: A) => fu.flatMap(_ => f(a)).map(_ => ())).flatMap(identity)
4647

47-
def flatten[B](implicit asIterable: A => GenIterable[B], executor: ExecutionContext): AsyncStream[B] = {
48-
val streamChunk = (p: Step[A, AsyncStream[A]]) =>
49-
concat(generate(asIterable(p.value))(it => if (it.nonEmpty) Future(it.head, it.tail) else ENDF), p.rest.flatten)
48+
def flatten[B](implicit asIterable: A => GenIterable[B]): AsyncStream[F, B] = {
49+
val streamChunk = (p: Step[A, AsyncStream[F, A]]) =>
50+
concat(generate(asIterable(p.value))(it => if (it.nonEmpty) (it.head, it.tail).point[F] else ENDF[F]), p.rest.flatten)
5051

5152
AsyncStream(data.flatMap {
52-
case END => ENDF
53+
case END => ENDF[F]
5354
case step => streamChunk(step).data
5455
})
5556
}
5657
}
5758

58-
5959
object AsyncStream {
60-
def nil[A](implicit executor: ExecutionContext): AsyncStream[A] = AsyncStream(ENDF)
61-
def single[A](item: A)(implicit executor: ExecutionContext): AsyncStream[A] =
62-
AsyncStream(Future(Step(item, nil[A])))
60+
def nil[F[+_]: Monad, A]: AsyncStream[F, A] = AsyncStream(ENDF[F])
61+
def single[F[+_]: Monad, A](item: A): AsyncStream[F, A] =
62+
AsyncStream(Step(item, nil[F, A]).point[F])
6363

64-
def generate[S, A](start: S)(gen: S => Future[(A, S)])(implicit executor: ExecutionContext): AsyncStream[A] =
64+
def generate[F[+_]: Monad, S, A](start: S)(gen: S => F[(A, S)]): AsyncStream[F, A] =
6565
AsyncStream(gen(start).map {
6666
case END => END
6767
case (el, rest) => Step(el, generate(rest)(gen))
6868
})
6969

70-
def concat[A](s1: AsyncStream[A], s2: AsyncStream[A])(implicit executor: ExecutionContext): AsyncStream[A] =
71-
new AsyncStream[A](s1.data.flatMap {
70+
def concat[F[+_]: Monad, A](s1: AsyncStream[F, A], s2: AsyncStream[F, A]): AsyncStream[F, A] =
71+
new AsyncStream[F, A](s1.data.flatMap {
7272
case END => s2.data
73-
case step => Future(Step(step.value, concat(step.rest, s2)))
73+
case step => Step(step.value, concat(step.rest, s2)).point[F]
7474
})
7575
}
7676

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,19 @@
11
package asyncstreams
22

3-
import scala.concurrent.{ExecutionContext, Future}
4-
import scalaz.MonadPlus
3+
import scala.language.higherKinds
4+
import scalaz.{Monad, MonadPlus}
5+
import scalaz.syntax.monad._
56

6-
class AsyncStreamMonad(implicit executor: ExecutionContext) extends MonadPlus[AsyncStream] {
7+
class AsyncStreamMonad[F[+_]: Monad] extends MonadPlus[AsyncStream[F, ?]] {
78
import AsyncStream._
89

9-
override def empty[A] = nil[A]
10+
override def empty[A]: AsyncStream[F, A] = nil[F, A]
1011

11-
override def point[A](a: => A): AsyncStream[A] = single(a)
12+
override def point[A](a: => A): AsyncStream[F, A] = single(a)
1213

13-
override def plus[A](a: AsyncStream[A], b: => AsyncStream[A]) = concat(a, b)
14+
override def plus[A](a: AsyncStream[F, A], b: => AsyncStream[F, A]): AsyncStream[F, A] = concat(a, b)
1415

15-
override def bind[A, B](ma: AsyncStream[A])(f: A => AsyncStream[B]): AsyncStream[B] =
16+
override def bind[A, B](ma: AsyncStream[F, A])(f: A => AsyncStream[F, B]): AsyncStream[F, B] =
1617
AsyncStream(
1718
ma.data.flatMap {
1819
case END => ENDF
@@ -23,29 +24,28 @@ class AsyncStreamMonad(implicit executor: ExecutionContext) extends MonadPlus[As
2324
)
2425
}
2526

26-
trait AsyncStreamMonadFunctions {
27-
def foreach[A, S](stream: AsyncStream[A])(f: A => FState[S, _])
28-
(implicit ex: ExecutionContext): FState[S, Unit] =
27+
class AsyncStreamMonadFunctions[F[+_]: Monad] {
28+
def foreach[A, S](stream: AsyncStream[F, A])(f: A => FState[F, S, _]): FState[F, S, Unit] =
2929
FState(s => {
30-
stream.foldLeft(Future(s))((futureS, a) => futureS.flatMap(s2 => f(a)(s2).map(_._2)))
30+
stream.foldLeft(s.point[F])((fS, a) => fS.flatMap(s2 => f(a)(s2).map(_._2)))
3131
.flatMap(identity).map(((), _))
3232
})
3333

34-
def isEmpty[A, S](stream: AsyncStream[A])(implicit ex: ExecutionContext): FState[S, Boolean] =
34+
def isEmpty[A, S](stream: AsyncStream[F, A]): FState[F, S, Boolean] =
3535
FState(s => stream.data.map(step => (step eq END, s)))
3636

37-
def isEmpty[A, S : FStateMonad](f: S => AsyncStream[A])(implicit fsm: FStateMonad[S], ex: ExecutionContext): FState[S, Boolean] =
37+
def isEmpty[A, S](f: S => AsyncStream[F, A])(implicit fsm: FStateMonad[F, S]): FState[F, S, Boolean] =
3838
fsm.fcondS((s: S) => isEmpty(f(s)))
3939

40-
def notEmpty[A, S](stream: AsyncStream[A])(implicit ex: ExecutionContext): FState[S, Boolean] =
40+
def notEmpty[A, S](stream: AsyncStream[F, A]): FState[F, S, Boolean] =
4141
FState(s => stream.data map (step => (!(step eq END), s)))
4242

43-
def notEmpty[A, S](f: S => AsyncStream[A])(implicit fsm: FStateMonad[S], ex: ExecutionContext): FState[S, Boolean] =
43+
def notEmpty[A, S](f: S => AsyncStream[F, A])(implicit fsm: FStateMonad[F, S]): FState[F, S, Boolean] =
4444
fsm.fcondS(s => notEmpty(f(s)))
4545

46-
def get[A, S](stream: AsyncStream[A])(implicit ex: ExecutionContext): FState[S, (A, AsyncStream[A])] =
46+
def get[A, S](stream: AsyncStream[F, A]): FState[F, S, (A, AsyncStream[F, A])] =
4747
FState(s => stream.data.map(step => ((step.value, step.rest), s)))
4848

49-
def generateS[S,A](start: S)(gen: FState[S, A])(implicit ex: ExecutionContext) =
49+
def generateS[S, A](start: S)(gen: FState[F, S, A]): AsyncStream[F, A] =
5050
AsyncStream.generate(start)(gen.func)
5151
}
Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,37 @@
11
package asyncstreams
22

3-
import scala.concurrent.{ExecutionContext, Future}
3+
import scala.language.higherKinds
4+
import scalaz.Monad
5+
import scalaz.syntax.monad._
46

5-
class FState[S, A](val func: S => Future[(A, S)]) {
7+
class FState[F[+_]: Monad, S, A](val func: S => F[(A, S)]) {
68
import FState._
79

8-
def apply(s: S) = func(s)
10+
def apply(s: S): F[(A, S)] = func(s)
911

10-
def flatMap[B](f: A => FState[S, B])(implicit ex: ExecutionContext): FState[S, B] = FState[S, B](
12+
def flatMap[B](f: A => FState[F, S, B]): FState[F, S, B] = FState[F, S, B](
1113
(s: S) => func(s).flatMap {
12-
case END => ENDF
14+
case END => ENDF[F]
1315
case (fst, snd) => f(fst)(snd)
1416
}
1517
)
1618

17-
def map[B](f: A => B)(implicit ex: ExecutionContext): FState[S, B] =
18-
flatMap((a: A) => FState.unit(f(a)))
19+
def map[B](f: A => B): FState[F, S, B] = flatMap((a: A) => FState.unit(f(a)))
1920

20-
def bind[B](f: A => FState[S, B])(implicit ex: ExecutionContext): FState[S, B] =
21+
def bind[B](f: A => FState[F, S, B]): FState[F, S, B] =
2122
FState((s: S) => func(s) flatMap {
22-
case END => ENDF
23+
case END => ENDF[F]
2324
case (fst, snd) => f(fst)(snd)
2425
})
2526

26-
def filter(p: A => Boolean)(implicit executor: ExecutionContext): FState[S, A] =
27-
bind(a => if (p(a)) unit(a) else empty[S, A])
27+
def filter(p: A => Boolean): FState[F, S, A] =
28+
bind(a => if (p(a)) unit(a) else empty[F, S, A])
2829

29-
def withFilter(p: A => Boolean)(implicit executor: ExecutionContext): FState[S, A] = filter(p)
30+
def withFilter(p: A => Boolean): FState[F, S, A] = filter(p)
3031
}
3132

3233
object FState {
33-
def apply[S, A](f: S => Future[(A, S)]) = new FState[S, A](f)
34-
def unit[S, A](a: => A)(implicit ex: ExecutionContext) = FState[S, A]((s: S) => Future((a, s)))
35-
def empty[S, A](implicit ex: ExecutionContext) = FState[S, A]((s: S) => ENDF)
34+
def apply[F[+_]: Monad, S, A](f: S => F[(A, S)]) = new FState[F, S, A](f)
35+
def unit[F[+_]: Monad, S, A](a: => A) = FState[F, S, A]((s: S) => (a, s).point[F])
36+
def empty[F[+_]: Monad, S, A] = FState[F, S, A]((s: S) => ENDF[F])
3637
}
Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,36 @@
11
package asyncstreams
22

3-
import scala.concurrent.{ExecutionContext, Future}
4-
import scalaz.MonadPlus
3+
import scala.language.higherKinds
4+
import scalaz.{Monad, MonadPlus}
5+
import scalaz.syntax.monad._
56

6-
class FStateMonad[S](implicit ex: ExecutionContext)
7-
extends MonadPlus[FState[S, ?]] with FStateMonadFunctions {
8-
type F[X] = FState[S, X]
7+
class FStateMonad[Fu[+_]: Monad, S] extends FStateMonadFunctions[Fu] with MonadPlus[FState[Fu, S, ?]] {
8+
type FS[X] = FState[Fu, S, X]
99

10-
override def empty[A]: F[A] = FState.empty[S, A]
10+
override def empty[A]: FS[A] = FState.empty[Fu, S, A]
1111

12-
override def point[A](a: => A): F[A] = FState.unit(a)
12+
override def point[A](a: => A): FS[A] = FState.unit(a)
1313

14-
override def bind[A, B](m: F[A])(f: A => F[B]): F[B] = m.bind(f)
14+
override def bind[A, B](m: FS[A])(f: A => FS[B]): FS[B] = m.bind(f)
1515

16-
override def plus[A](a: F[A],b: => F[A]): F[A] = bind(a)(_ => b)
16+
override def plus[A](a: FS[A], b: => FS[A]): FS[A] = bind(a)(_ => b)
1717

18-
def condS(f: S => Boolean): F[Boolean] = bind(getS[S])(vs => point(f(vs)))
19-
def fcondS(f: S => F[Boolean]): F[Boolean] = bind(getS[S])(f)
20-
def modS(f: S => S): F[S] = bind(getS[S])(vs => putS(f(vs)))
18+
def condS(f: S => Boolean): FS[Boolean] = bind(getS[S])(vs => point(f(vs)))
19+
def fcondS(f: S => FS[Boolean]): FS[Boolean] = bind(getS[S])(f)
20+
def modS(f: S => S): FS[S] = bind(getS[S])(vs => putS(f(vs)))
2121

22-
def forM_[A](cond: S => Boolean, mod: S => S)(action: => F[A]): F[Unit] =
22+
def forM_[A](cond: S => Boolean, mod: S => S)(action: => FS[A]): FS[Unit] =
2323
whileM_(condS(cond), bind(action)(va => modS(mod)))
2424
}
2525

26-
trait FStateMonadFunctions {
27-
def getS[S](implicit ex: ExecutionContext): FState[S, S] = FState((s: S) => Future((s, s)))
26+
class FStateMonadFunctions[Fu[+_]: Monad] {
27+
def getS[S]: FState[Fu, S, S] = FState((s: S) => (s, s).point[Fu])
2828

29-
def putS[S](news: S)(implicit ex: ExecutionContext): FState[S, S] = FState((_: S) => Future((news, news)))
29+
def putS[S](news: S): FState[Fu, S, S] = FState((_: S) => (news, news).point[Fu])
3030

31-
def condS[S](f: S => Boolean)(implicit m: FStateMonad[S]): FStateMonad[S]#F[Boolean] = m.condS(f)
31+
def condS[S](f: S => Boolean)(implicit m: FStateMonad[Fu, S]): FStateMonad[Fu, S]#FS[Boolean] = m.condS(f)
3232

33-
def fconds[S](f: S => FState[S, Boolean])(implicit m: FStateMonad[S]): FStateMonad[S]#F[Boolean] = m.fconds(f)
33+
def fconds[S](f: S => FState[Fu, S, Boolean])(implicit m: FStateMonad[Fu, S]): FStateMonad[Fu, S]#FS[Boolean] = m.fconds(f)
3434

35-
def modS[S](f: S => S)(implicit m: FStateMonad[S]): FStateMonad[S]#F[S] = m.modS(f)
35+
def modS[S](f: S => S)(implicit m: FStateMonad[Fu, S]): FStateMonad[Fu, S]#FS[S] = m.modS(f)
3636
}

src/main/scala/asyncstreams/Step.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
package asyncstreams
22

33
class Step[A, B](fp: A, sp: => B) {
4-
val value = fp
5-
lazy val rest = sp
4+
val value: A = fp
5+
lazy val rest: B = sp
66
}
77

88
object Step {
Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
1-
import scala.concurrent.{ExecutionContext, Future}
1+
import scala.language.higherKinds
22
import scalaz.Monad
33

44
package object asyncstreams {
55
final val END: Null = null
6-
final def ENDF(implicit executor: ExecutionContext): Future[Null] = Future(END)
6+
final def ENDF[F[+_]: Monad](implicit fm: Monad[F]): F[Null] = fm.point(END)
77

8-
implicit def asyncStreamInstance(implicit executor: ExecutionContext): Monad[AsyncStream] =
9-
new AsyncStreamMonad
8+
implicit def asyncStreamInstance[F[+_]: Monad]: Monad[AsyncStream[F, ?]] = new AsyncStreamMonad[F]
109

11-
def fStateInstance[S](implicit executor: ExecutionContext) = new FStateMonad[S]
10+
def fStateInstance[F[+_]: Monad, S] = new FStateMonad[F, S]
1211

13-
object monadops extends FStateMonadFunctions with AsyncStreamMonadFunctions
12+
def fStateOps[F[+_]: Monad] = new FStateMonadFunctions[F]
13+
14+
def streamOps[F[+_]: Monad] = new AsyncStreamMonadFunctions[F]
1415
}

src/test/scala/asyncstreams/AsyncStreamMonadicOperationsTests.scala renamed to src/test/scala/asyncstreams/scalaFutures/AsyncStreamMonadicOperationsTests.scala

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
1-
package asyncstreams
1+
package asyncstreams.scalaFutures
22

3-
import monadops._
4-
import AsyncStream._
3+
import asyncstreams.AsyncStream.generate
4+
import asyncstreams.{streamOps, fStateOps}
5+
import asyncstreams.{AsyncStream, BaseSuite, ENDF, fStateInstance}
56

7+
import scala.concurrent.ExecutionContext.Implicits.global
68
import scala.concurrent.duration._
79
import scala.concurrent.{Await, Future}
8-
import scala.concurrent.ExecutionContext.Implicits.global
10+
import scalaz.std.scalaFuture._
911

1012
class AsyncStreamMonadicOperationsTests extends BaseSuite {
11-
def makeStream(l: List[Int]) = generate(l)(l => if (l.isEmpty) ENDF else Future((l.head, l.tail)))
13+
private val so = streamOps[Future]
14+
import so._
15+
private val fso = fStateOps[Future]
16+
import fso._
17+
18+
def makeStream(l: List[Int]) = generate(l)(l => if (l.isEmpty) ENDF[Future] else Future((l.head, l.tail)))
1219

1320
private def wait[T](f: Future[T]): T = Await.result(f, 10.seconds)
1421

1522
test("foreach") {
16-
implicit val fsm = fStateInstance[Int]
23+
implicit val fsm = fStateInstance[Future, Int]
1724

1825
val fstate = for {
1926
_ <- foreach(makeStream(0 :: 1 :: 2 :: Nil)) {
@@ -26,8 +33,8 @@ class AsyncStreamMonadicOperationsTests extends BaseSuite {
2633
}
2734

2835
test("get, isEmpty") {
29-
case class State(counter: Int, stream: AsyncStream[Int])
30-
implicit val fsm = fStateInstance[State]
36+
case class State(counter: Int, stream: AsyncStream[Future, Int])
37+
implicit val fsm = fStateInstance[Future, State]
3138

3239
val stream = makeStream(0 :: 1 :: 2 :: 3 :: Nil)
3340

@@ -44,7 +51,7 @@ class AsyncStreamMonadicOperationsTests extends BaseSuite {
4451
}
4552

4653
test("FState as generator") {
47-
implicit val fsm = fStateInstance[Int]
54+
implicit val fsm = fStateInstance[Future, Int]
4855

4956
val stream = generateS(0) {
5057
for {
@@ -57,7 +64,7 @@ class AsyncStreamMonadicOperationsTests extends BaseSuite {
5764
}
5865

5966
test("Generate finite stream") {
60-
implicit val fsm = fStateInstance[Int]
67+
implicit val fsm = fStateInstance[Future, Int]
6168

6269
val stream = generateS(0) {
6370
for {

0 commit comments

Comments
 (0)