@@ -6,14 +6,14 @@ import scala.collection.generic.CanBuildFrom
6
6
import scala .concurrent .{ExecutionContext , Future }
7
7
import scala .language .higherKinds
8
8
9
- case class AsyncStream [A ](data : Future [Pair [A , AsyncStream [A ]]]) {
9
+ case class AsyncStream [A ](data : Future [Step [A , AsyncStream [A ]]]) {
10
10
import AsyncStream ._
11
11
12
12
def foldLeft [B ](start : B )(f : (B , A ) => B )(implicit executor : ExecutionContext ): Future [B ] = {
13
- def impl (d : Future [Pair [A , AsyncStream [A ]]], acc : Future [B ]): Future [B ] =
13
+ def impl (d : Future [Step [A , AsyncStream [A ]]], acc : Future [B ]): Future [B ] =
14
14
d.flatMap {
15
15
case END => acc
16
- case pair => impl(pair.second .data, acc map (b => f(b, pair.first )))
16
+ case step => impl(step.rest .data, acc map (b => f(b, step.value )))
17
17
}
18
18
19
19
impl(data, Future (start))
@@ -26,16 +26,16 @@ case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]]) {
26
26
def takeWhile (p : A => Boolean )(implicit executor : ExecutionContext ): AsyncStream [A ] =
27
27
new AsyncStream [A ](data map {
28
28
case END => END
29
- case pair if ! p(pair.first ) => END
30
- case pair => Pair (pair.first, pair.second .takeWhile(p))
29
+ case step if ! p(step.value ) => END
30
+ case step => Step (step.value, step.rest .takeWhile(p))
31
31
})
32
32
33
33
34
34
def take (n : Int )(implicit executor : ExecutionContext ): AsyncStream [A ] =
35
35
if (n <= 0 ) nil
36
36
else AsyncStream (data.map {
37
37
case END => END
38
- case p => Pair (p.first , p.second .take(n - 1 ))
38
+ case p => Step (p.value , p.rest .take(n - 1 ))
39
39
})
40
40
41
41
def foreach [U ](f : (A ) => U )(implicit executor : ExecutionContext ): Future [Unit ] =
@@ -45,12 +45,12 @@ case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]]) {
45
45
foldLeft(Future (()))((fu : Future [Unit ], a : A ) => fu.flatMap(_ => f(a)).map(_ => ())).flatMap(u => u)
46
46
47
47
def flatten [B ](implicit asIterable : A => GenIterable [B ], executor : ExecutionContext ): AsyncStream [B ] = {
48
- val streamChunk = (p : Pair [A , AsyncStream [A ]]) =>
49
- concat(generate(asIterable(p.first ))(it => if (it.nonEmpty) Future (it.head, it.tail) else ENDF ), p.second .flatten)
48
+ val streamChunk = (p : Step [A , AsyncStream [A ]]) =>
49
+ concat(generate(asIterable(p.value ))(it => if (it.nonEmpty) Future (it.head, it.tail) else ENDF ), p.rest .flatten)
50
50
51
51
AsyncStream (data.flatMap {
52
52
case END => ENDF
53
- case pair => streamChunk(pair ).data
53
+ case step => streamChunk(step ).data
54
54
})
55
55
}
56
56
}
@@ -59,18 +59,18 @@ case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]]) {
59
59
object AsyncStream {
60
60
def nil [A ](implicit executor : ExecutionContext ): AsyncStream [A ] = AsyncStream (ENDF )
61
61
def single [A ](item : A )(implicit executor : ExecutionContext ): AsyncStream [A ] =
62
- AsyncStream (Future (Pair (item, nil[A ])))
62
+ AsyncStream (Future (Step (item, nil[A ])))
63
63
64
64
def generate [S , A ](start : S )(gen : S => Future [(A , S )])(implicit executor : ExecutionContext ): AsyncStream [A ] =
65
65
AsyncStream (gen(start).map {
66
66
case END => END
67
- case (el, rest) => Pair (el, generate(rest)(gen))
67
+ case (el, rest) => Step (el, generate(rest)(gen))
68
68
})
69
69
70
70
def concat [A ](s1 : AsyncStream [A ], s2 : AsyncStream [A ])(implicit executor : ExecutionContext ): AsyncStream [A ] =
71
71
new AsyncStream [A ](s1.data.flatMap {
72
72
case END => s2.data
73
- case p => Future (Pair (p.first , concat(p.second , s2)))
73
+ case step => Future (Step (step.value , concat(step.rest , s2)))
74
74
})
75
75
}
76
76
0 commit comments