Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.

Commit e18fc50

Browse files
committed
Add flatten
1 parent e7e4e21 commit e18fc50

File tree

4 files changed

+20
-3
lines changed

4 files changed

+20
-3
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name := "asyncstreams"
22

3-
version := "0.2"
3+
version := "0.3"
44

55
scalaVersion := "2.11.8"
66

readme.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ asyncstreams is available via jitpack:
2323
```
2424
resolvers += "jitpack" at "https://jitpack.io"
2525
26-
libraryDependencies += "com.github.danslapman" %% "asyncstreams" % "0.1"
26+
libraryDependencies += "com.github.danslapman" %% "asyncstreams" % "0.3"
2727
```
2828

2929
asyncstreams is based on [scala-async](https://github.com/iboltaev/scala-async) ideas.

src/main/scala/asyncstreams/AsyncStream.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package asyncstreams
22

33
import scala.annotation.unchecked.{uncheckedVariance => uV}
4+
import scala.collection.GenIterable
45
import scala.collection.generic.CanBuildFrom
56
import scala.concurrent.{ExecutionContext, Future}
67
import scala.language.higherKinds
@@ -42,6 +43,16 @@ case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]]) {
4243

4344
def foreachF[U](f: (A) => Future[U])(implicit executor: ExecutionContext): Future[Unit] =
4445
foldLeft(Future(()))((fu: Future[Unit], a: A) => fu.flatMap(_ => f(a)).map(_ => ())).flatMap(u => u)
46+
47+
def flatten[B](implicit asIterable: A => GenIterable[B], executor: ExecutionContext): AsyncStream[B] = {
48+
val streamChunk = (p: Pair[A, AsyncStream[A]]) =>
49+
concat(generate(asIterable(p.first))(it => if (it.nonEmpty) Future(it.head, it.tail) else ENDF), p.second.flatten)
50+
51+
AsyncStream(data.flatMap {
52+
case END => ENDF
53+
case pair => streamChunk(pair).data
54+
})
55+
}
4556
}
4657

4758

src/test/scala/asyncstreams/AsyncStreamTests.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import scalaz.syntax.std.boolean._
1212

1313

1414
class AsyncStreamTests extends BaseSuite {
15-
private def makeStream(l: List[Int]) = generate(l)(l => ((l.nonEmpty)?(l.head, l.tail)|END).point[Future])
15+
private def makeStream[T](l: Iterable[T]) = generate(l)(l => (l.nonEmpty ?(l.head, l.tail)|END).point[Future])
1616

1717
private def makeInfStream = generate(0)(v => Future((v, v + 1)))
1818

@@ -73,4 +73,10 @@ class AsyncStreamTests extends BaseSuite {
7373
Await.ready(task, 10.seconds)
7474
buffer.to[List] shouldBe 0 :: 1 :: 2 :: 3 :: 4 :: 5 :: 6 :: 7 :: 8 :: 9 :: Nil
7575
}
76+
77+
test("flatten") {
78+
val stream = makeStream(Vector.range(0, 1000000).grouped(10).to[Vector])
79+
val flatStream = stream.flatten
80+
wait(flatStream.to[Vector]) shouldBe Vector.range(0, 1000000)
81+
}
7682
}

0 commit comments

Comments
 (0)