Skip to content

Commit 0e1e5cc

Browse files
author
cchantep
committed
Managed stream & advanced aggregation
1 parent f04b83f commit 0e1e5cc

File tree

5 files changed

+252
-20
lines changed

5 files changed

+252
-20
lines changed

documentation/manual/scalaGuide/main/sql/ScalaAnorm.md

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -206,30 +206,26 @@ val code: String = SQL"""
206206

207207
This feature tries to make faster, more concise and easier to read the way to retrieve data in Anorm. Please, feel free to use it wherever you see a combination of `SQL().on()` functions (or even an only `SQL()` without parameters).
208208

209-
## Retrieving data using the Stream API
209+
## Streaming results
210210

211-
The first way to access the results of a select query is to use the Stream API.
211+
Query results can be processed as stream of row, not having all loaded in memory.
212212

213-
When you call `apply()` on any SQL statement, you will receive a lazy `Stream` of `Row` instances, where each row can be seen as a dictionary:
213+
In the following example we will count the number of country rows.
214214

215215
```scala
216-
// Create an SQL query
217-
val selectCountries = SQL("Select * from Country")
218-
219-
// Transform the resulting Stream[Row] to a List[(String,String)]
220-
val countries = selectCountries().map(row =>
221-
row[String]("code") -> row[String]("name")
222-
).toList
216+
val countryCount: Long =
217+
SQL"Select count(*) as c from Country".fold(0l) { (c, _) => c + 1 }
223218
```
224219

225-
In the following example we will count the number of `Country` entries in the database, so the result set will be a single row with a single column:
220+
It's also possible to partially treat the row stream.
226221

227222
```scala
228-
// First retrieve the first row
229-
val firstRow = SQL("Select count(*) as c from Country").apply().head
230-
231-
// Next get the content of the 'c' column as Long
232-
val countryCount = firstRow[Long]("c")
223+
// Create an SQL query
224+
val books: List[String] = SQL("Select name from Books").
225+
foldWhile(List[String]()) { (list, row) =>
226+
if (list.size == 100) (list -> false) // stop with `list`
227+
else (list := row[String]("name")) -> true // continue with one more name
228+
}
233229
```
234230

235231
### Multi-value support

framework/src/anorm/src/main/scala/anorm/Anorm.scala

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,32 @@ private[anorm] trait Sql {
264264
/**
265265
* Executes this SQL statement as query, returns result as Row stream.
266266
*/
267+
@deprecated(message =
268+
"Use [[fold]] instead, which manages resources and memory", "2.4")
267269
def apply()(implicit connection: Connection): Stream[Row] =
268270
Sql.resultSetToStream(resultSet())
269271

272+
/**
273+
* Aggregates over the whole stream of row using the specified operator.
274+
*
275+
* @param z the start value
276+
* @param op Aggregate operator
277+
* @return Either list of failures at left, or aggregated value
278+
* @see #foldWhile
279+
*/
280+
def fold[T](z: T)(op: (T, Row) => T)(implicit connection: Connection): Either[List[Throwable], T] =
281+
Sql.fold(resultSet())(z) { (t, r) => op(t, r) -> true } acquireFor identity
282+
283+
/**
284+
* Aggregates over part of or the while row stream, using the specified operator.
285+
*
286+
* @param z the start value
287+
* @param op Aggregate operator. Returns aggregated value along with true if aggregation must process next value, or false to stop with current value.
288+
* @return Either list of failures at left, or aggregated value
289+
*/
290+
def foldWhile[T](z: T)(op: (T, Row) => (T, Boolean))(implicit connection: Connection): Either[List[Throwable], T] =
291+
Sql.fold(resultSet())(z) { (t, r) => op(t, r) } acquireFor identity
292+
270293
/**
271294
* Executes this statement as query (see [[executeQuery]]) and returns result.
272295
*/
@@ -364,24 +387,31 @@ object Sql { // TODO: Rename to SQL
364387
}
365388

366389
private[anorm] def as[T](parser: ResultSetParser[T], rs: ManagedResource[ResultSet])(implicit connection: Connection): T =
367-
parser(Sql.resultSetToStream(rs)) match {
390+
parser(Sql.resultSetToStream(rs) /* TODO: Refactory with fold */ ) match {
368391
case Success(a) => a
369392
case Error(e) => sys.error(e.toString)
370393
}
371394

372-
private def fold[T](res: ManagedResource[ResultSet])(initial: T)(f: (T, Row) => T): ManagedResource[T] = res map { rs =>
395+
/**
396+
* @param f Aggregate operator. Returns aggregated value along with true if aggregation must process next value, or false to stop with current value.
397+
*/
398+
private[anorm] def fold[T](res: ManagedResource[ResultSet])(initial: T)(f: (T, Row) => (T, Boolean)): ManagedResource[T] = res map { rs =>
373399
val rsMetaData: MetaData = metaData(rs)
374400
val columns: List[Int] = List.range(1, rsMetaData.columnCount + 1)
375401
@inline def data(rs: ResultSet) = columns.map(rs.getObject(_))
376402

377403
@annotation.tailrec
378404
def go(r: ResultSet, s: T): T =
379-
if (r.next()) go(r, f(s, new SqlRow(rsMetaData, data(rs)))) else s
405+
if (r.next()) {
406+
val (v, cont) = f(s, new SqlRow(rsMetaData, data(rs)))
407+
if (cont) go(r, v) else v
408+
} else s
380409

381410
go(rs, initial)
382411
}
383412

384-
private[anorm] def resultSetToStream(rs: ManagedResource[ResultSet]): Stream[Row] = fold(rs)(Stream.empty[Row])((s, r) => s :+ r) acquireAndGet identity
413+
// TODO: Remove when unused
414+
private[anorm] def resultSetToStream(rs: ManagedResource[ResultSet]): Stream[Row] = fold(rs)(Stream.empty[Row])((s, r) => (s :+ r) -> true) acquireAndGet identity
385415

386416
private case class SqlRow(metaData: MetaData, data: List[Any]) extends Row {
387417
override lazy val toString = "Row(" + metaData.ms.zip(data).map(t => s"'${t._1.column}': ${t._2} as ${t._1.clazz}").mkString(", ") + ")"

framework/src/anorm/src/main/scala/anorm/SqlQueryResult.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,43 @@ final case class SqlQueryResult(
4040
_.headOption.map(new SQLWarning(_)), Option(_))
4141

4242
/** Returns stream of row from query result. */
43+
@deprecated("Use [[fold]] or [[foldWhile]] instead, which manages resources and memory", "2.4")
4344
def apply()(implicit connection: Connection): Stream[Row] =
4445
Sql.resultSetToStream(resultSet)
4546

47+
/**
48+
* Aggregates over the whole row stream using the specified operator.
49+
*
50+
* @param z the start value
51+
* @param op Aggregate operator
52+
* @return Either list of failures at left, or aggregated value
53+
* @see #foldWhile
54+
*/
55+
def fold[T](z: T)(op: (T, Row) => T)(implicit connection: Connection): Either[List[Throwable], T] =
56+
Sql.fold(resultSet)(z) { (t, r) => op(t, r) -> true } acquireFor identity
57+
58+
/**
59+
* Aggregates over part of or the while row stream,
60+
* using the specified operator.
61+
*
62+
* @param z the start value
63+
* @param op Aggregate operator. Returns aggregated value along with true if aggregation must process next value, or false to stop with current value.
64+
* @return Either list of failures at left, or aggregated value
65+
*/
66+
def foldWhile[T](z: T)(op: (T, Row) => (T, Boolean))(implicit connection: Connection): Either[List[Throwable], T] =
67+
Sql.fold(resultSet)(z) { (t, r) => op(t, r) } acquireFor identity
68+
4669
def as[T](parser: ResultSetParser[T])(implicit connection: Connection): T =
4770
Sql.as(parser, resultSet)
4871

72+
// TODO: Scaladoc as `as` equivalent
4973
def list[A](rowParser: RowParser[A])(implicit connection: Connection): Seq[A] = as(rowParser.*)
5074

75+
// TODO: Scaladoc as `as` equivalent
5176
def single[A](rowParser: RowParser[A])(implicit connection: Connection): A =
5277
as(ResultSetParser.single(rowParser))
5378

79+
// TODO: Scaladoc as `as` equivalent
5480
def singleOpt[A](rowParser: RowParser[A])(implicit connection: Connection): Option[A] = as(ResultSetParser.singleOpt(rowParser))
5581

5682
@deprecated(message = "Use [[as]]", since = "2.3.2")

framework/src/anorm/src/test/scala/anorm/AnormSpec.scala

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,76 @@ object AnormSpec extends Specification with H2Database with AnormTest {
321321
}
322322
}
323323

324+
"Aggregation over all rows" should {
325+
"be empty when there is no result" in withQueryResult(QueryResult.Nil) {
326+
implicit c =>
327+
SQL"EXEC test".fold[Option[Int]](None)({ (_, _) => Some(0) }).
328+
aka("aggregated value") must beRight(None)
329+
330+
}
331+
332+
"be parsed from mapped result" in withQueryResult(
333+
rowList2(classOf[String] -> "foo", classOf[Int] -> "bar").
334+
append("row1", 100) :+ ("row2", 200)) { implicit c =>
335+
336+
SQL"SELECT * FROM test".fold(List[(String,Int)]())(
337+
{ (l, row) => l :+ (row[String]("foo") -> row[Int]("bar")) }).
338+
aka("tuple stream") must_== Right(List("row1" -> 100, "row2" -> 200))
339+
340+
}
341+
342+
"handle failure" in withQueryResult(
343+
rowList1(classOf[String] -> "foo") :+ "A" :+ "B") { implicit c =>
344+
var i = 0
345+
SQL"SELECT str".fold(Set[String]()) { (l, row) =>
346+
if (i == 0) { i = i+1; l + row[String]("foo") } else sys.error("Failure")
347+
348+
} aka "aggregate on failure" must beLike {
349+
case Left(err :: Nil) => err.getMessage aka "failure" must_== "Failure"
350+
} and (i aka "row count" must_== 1)
351+
}
352+
}
353+
354+
"Aggregation over variable number of rows" should {
355+
"be empty when there is no result" in withQueryResult(QueryResult.Nil) {
356+
implicit c =>
357+
SQL"EXEC test".foldWhile[Option[Int]](None)(
358+
{ (_, _) => Some(0) -> true }).
359+
aka("aggregated value") must beRight(None)
360+
361+
}
362+
363+
"be parsed from mapped result" in withQueryResult(
364+
rowList2(classOf[String] -> "foo", classOf[Int] -> "bar").
365+
append("row1", 100) :+ ("row2", 200)) { implicit c =>
366+
367+
SQL"SELECT * FROM test".foldWhile(List[(String,Int)]())({ (l, row) =>
368+
(l :+ (row[String]("foo") -> row[Int]("bar"))) -> true
369+
}) aka "tuple stream" must_== Right(List("row1" -> 100, "row2" -> 200))
370+
}
371+
372+
"handle failure" in withQueryResult(
373+
rowList1(classOf[String] -> "foo") :+ "A" :+ "B") { implicit c =>
374+
var i = 0
375+
SQL"SELECT str".foldWhile(Set[String]()) { (l, row) =>
376+
if (i == 0) { i = i+1; (l + row[String]("foo")) -> true }
377+
else sys.error("Failure")
378+
379+
} aka "aggregate on failure" must beLike {
380+
case Left(err :: Nil) => err.getMessage aka "failure" must_== "Failure"
381+
} and (i aka "row count" must_== 1)
382+
}
383+
384+
"stop after first row" in withQueryResult(
385+
rowList1(classOf[String] -> "foo") :+ "A" :+ "B") { implicit c =>
386+
var i = 0
387+
SQL"SELECT str".foldWhile(Set[String]()) { (l, row) =>
388+
if (i == 0) { i = i+1; (l + row[String]("foo")) -> true } else (l, false)
389+
390+
} aka "partial aggregate" must_== Right(Set("A"))
391+
}
392+
}
393+
324394
"Insertion" should {
325395
lazy implicit val con = connection(handleStatement withUpdateHandler {
326396
case UpdateExecution("INSERT ?", ExecutedParameter(1) :: Nil) => 1

framework/src/anorm/src/test/scala/anorm/SqlResultSpec.scala

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,116 @@ object SqlResultSpec extends org.specs2.mutable.Specification with H2Database {
203203
}
204204
}
205205

206+
"Aggregation over all rows" should {
207+
"release resources" in withQueryResult(
208+
stringList :+ "A" :+ "B" :+ "C") { implicit c =>
209+
210+
val res: SqlQueryResult = SQL"SELECT str".executeQuery()
211+
var closed = false
212+
val probe = resource.managed(
213+
new java.io.Closeable { def close() = closed = true })
214+
215+
var i = 0
216+
lazy val agg = res.copy(resultSet =
217+
res.resultSet.and(probe).map(_._1)).fold(List[Int]()) {
218+
(l, _) => i = i+1; l :+ i
219+
}
220+
221+
agg aka "aggregation" must_== Right(List(1, 2, 3)) and (
222+
closed aka "resource release" must beTrue) and (
223+
i aka "row count" must_== 3)
224+
225+
}
226+
227+
"release resources on exception" in withQueryResult(
228+
stringList :+ "A" :+ "B" :+ "C") { implicit c =>
229+
230+
val res: SqlQueryResult = SQL"SELECT str".executeQuery()
231+
var closed = false
232+
val probe = resource.managed(
233+
new java.io.Closeable { def close() = closed = true })
234+
235+
var i = 0
236+
lazy val agg = res.copy(resultSet = res.resultSet.and(probe).map(_._1)).
237+
fold(List[Int]()) { (l, _) =>
238+
if (i == 1) sys.error("Unexpected") else { i = i +1; l :+ i }
239+
}
240+
241+
agg aka "aggregation" must beLike {
242+
case Left(err :: Nil) =>
243+
err.getMessage aka "failure" must_== "Unexpected"
244+
} and (closed aka("resource release") must beTrue) and (
245+
i aka "row count" must_== 1)
246+
247+
}
248+
}
249+
250+
"Aggregation over variable number of rows" should {
251+
"release resources" in withQueryResult(
252+
stringList :+ "A" :+ "B" :+ "C") { implicit c =>
253+
254+
val res: SqlQueryResult = SQL"SELECT str".executeQuery()
255+
var closed = false
256+
val probe = resource.managed(
257+
new java.io.Closeable { def close() = closed = true })
258+
259+
var i = 0
260+
lazy val agg = res.copy(resultSet =
261+
res.resultSet.and(probe).map(_._1)).foldWhile(List[Int]()) {
262+
(l, _) => i = i+1; (l :+ i) -> true
263+
}
264+
265+
agg aka "aggregation" must_== Right(List(1, 2, 3)) and (
266+
closed aka "resource release" must beTrue) and (
267+
i aka "row count" must_== 3)
268+
269+
}
270+
271+
"release resources on exception" in withQueryResult(
272+
stringList :+ "A" :+ "B" :+ "C") { implicit c =>
273+
274+
val res: SqlQueryResult = SQL"SELECT str".executeQuery()
275+
var closed = false
276+
val probe = resource.managed(
277+
new java.io.Closeable { def close() = closed = true })
278+
279+
var i = 0
280+
lazy val agg = res.copy(resultSet = res.resultSet.and(probe).map(_._1)).
281+
foldWhile(List[Int]()) { (l, _) =>
282+
if (i == 1) sys.error("Unexpected") else {
283+
i = i +1; (l :+ i) -> true
284+
}
285+
}
286+
287+
agg aka "aggregation" must beLike {
288+
case Left(err :: Nil) =>
289+
err.getMessage aka "failure" must_== "Unexpected"
290+
} and (closed aka "resource release" must beTrue) and (
291+
i aka "row count" must_== 1)
292+
293+
}
294+
295+
"stop after second row & release resources" in withQueryResult(
296+
stringList :+ "A" :+ "B" :+ "C") { implicit c =>
297+
298+
val res: SqlQueryResult = SQL"SELECT str".executeQuery()
299+
var closed = false
300+
val probe = resource.managed(
301+
new java.io.Closeable { def close() = closed = true })
302+
303+
var i = 0
304+
lazy val agg = res.copy(resultSet = res.resultSet.and(probe).map(_._1)).
305+
foldWhile(List[Int]()) { (l, _) =>
306+
if (i == 2) (l, false) else { i = i +1; (l :+ i) -> true }
307+
}
308+
309+
agg aka "aggregation" must_== Right(List(1, 2)) and (
310+
closed aka "resource release" must beTrue) and (
311+
i aka "row count" must_== 2)
312+
313+
}
314+
}
315+
206316
"SQL warning" should {
207317
"not be there on success" in withQueryResult(stringList :+ "A") {
208318
implicit c =>

0 commit comments

Comments
 (0)