Skip to content
This repository was archived by the owner on Jun 10, 2021. It is now read-only.

Commit e7e4e21

Browse files
committed
Add foreach and foreachF operations
1 parent 0d33b69 commit e7e4e21

File tree

3 files changed

+24
-1
lines changed

3 files changed

+24
-1
lines changed

build.sbt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name := "asyncstreams"
22

3-
version := "0.1"
3+
version := "0.2"
44

55
scalaVersion := "2.11.8"
66

src/main/scala/asyncstreams/AsyncStream.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,12 @@ case class AsyncStream[A](data: Future[Pair[A, AsyncStream[A]]]) {
3636
case END => END
3737
case p => Pair(p.first, p.second.take(n - 1))
3838
})
39+
40+
def foreach[U](f: (A) => U)(implicit executor: ExecutionContext): Future[Unit] =
41+
foldLeft(())((_: Unit, a: A) => {f(a); ()})
42+
43+
def foreachF[U](f: (A) => Future[U])(implicit executor: ExecutionContext): Future[Unit] =
44+
foldLeft(Future(()))((fu: Future[Unit], a: A) => fu.flatMap(_ => f(a)).map(_ => ())).flatMap(u => u)
3945
}
4046

4147

src/test/scala/asyncstreams/AsyncStreamTests.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import scala.concurrent.duration._
55
import scala.concurrent.ExecutionContext.Implicits.global
66
import asyncstreams.AsyncStream._
77

8+
import scala.collection.mutable.ArrayBuffer
89
import scalaz.std.scalaFuture._
910
import scalaz.syntax.monad._
1011
import scalaz.syntax.std.boolean._
@@ -56,4 +57,20 @@ class AsyncStreamTests extends BaseSuite {
5657
val r = makeInfStream.takeWhile(_ < 1000000)
5758
wait(r.to[List]) shouldBe (0 to 999999)
5859
}
60+
61+
test("foreach") {
62+
val stream = makeInfStream.take(10)
63+
val buffer = ArrayBuffer[Int]()
64+
val task = stream.foreach(i => buffer += i)
65+
Await.ready(task, 10.seconds)
66+
buffer.to[List] shouldBe 0 :: 1 :: 2 :: 3 :: 4 :: 5 :: 6 :: 7 :: 8 :: 9 :: Nil
67+
}
68+
69+
test("foreachF") {
70+
val stream = makeInfStream.take(10)
71+
val buffer = ArrayBuffer[Int]()
72+
val task = stream.foreachF(i => Future(buffer += i))
73+
Await.ready(task, 10.seconds)
74+
buffer.to[List] shouldBe 0 :: 1 :: 2 :: 3 :: 4 :: 5 :: 6 :: 7 :: 8 :: 9 :: Nil
75+
}
5976
}

0 commit comments

Comments
 (0)