Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions docs/overview/aggregations/elastic_aggregation_range.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
id: elastic_aggregation_range
title: "Range Aggregation"
---

The `Range` aggregation is a multi-value aggregation enables the user to define a set of ranges. During the aggregation process, the values extracted from each document will be checked against each bucket range.

In order to use the `Range` aggregation import the following:
```scala
import zio.elasticsearch.aggregation.RangeAggregation
import zio.elasticsearch.ElasticAggregation.RangeAggregation
```

You can create a `Range` aggregation using the `rangeAggregation` method this way:
```scala
val aggregation: RangeAggregation = rangeAggregation(name = "rangeAggregation", field = "testField", range = SingleRange.to(23.9))
```

You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Range` aggregation using the `rangeAggregation` method this way:
```scala
// Document.intField must be number value, because of Min aggregation
val aggregation: RangeAggregation = rangeAggregation(name = "rangeAggregation", field = Document.intField, range = SingleRange.to(23.9))
```

You can find more information about `Range` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-bucket-range-aggregation.html).
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import zio.elasticsearch.ElasticAggregation._
import zio.elasticsearch.ElasticHighlight.highlight
import zio.elasticsearch.ElasticQuery.{script => _, _}
import zio.elasticsearch.ElasticSort.sortBy
import zio.elasticsearch.aggregation.AggregationOrder
import zio.elasticsearch.aggregation.{AggregationOrder, SingleRange}
import zio.elasticsearch.data.GeoPoint
import zio.elasticsearch.domain.{PartialTestDocument, TestDocument, TestSubDocument}
import zio.elasticsearch.executor.Executor
Expand All @@ -33,7 +33,7 @@ import zio.elasticsearch.query.sort.SortOrder._
import zio.elasticsearch.query.sort.SourceType.NumberType
import zio.elasticsearch.query.{Distance, FunctionScoreBoostMode, FunctionScoreFunction, InnerHits}
import zio.elasticsearch.request.{CreationOutcome, DeletionOutcome}
import zio.elasticsearch.result.{FilterAggregationResult, Item, MaxAggregationResult, UpdateByQueryResult}
import zio.elasticsearch.result._
import zio.elasticsearch.script.{Painless, Script}
import zio.json.ast.Json.{Arr, Str}
import zio.schema.codec.JsonCodec
Expand Down Expand Up @@ -256,6 +256,49 @@ object HttpExecutorSpec extends IntegrationSpec {
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using range aggregation") {
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
(firstDocumentId, firstDocument, secondDocumentId, secondDocument) =>
for {
_ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll))
_ <- Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 120))
)
_ <-
Executor.execute(
ElasticRequest
.upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 180))
.refreshTrue
)
aggregation = rangeAggregation(
name = "aggregationInt",
field = TestDocument.intField,
range = SingleRange(from = 100.0, to = 200.0)
)
aggsRes <-
Executor
.execute(ElasticRequest.aggregate(selectors = firstSearchIndex, aggregation = aggregation))
.asRangeAggregation("aggregationInt")
} yield assert(aggsRes.head)(
equalTo(
RegularRangeAggregationResult(
Chunk(
RegularRangeAggregationBucketResult(
key = "100.0-200.0",
from = Some(100.0),
to = Some(200.0),
docCount = 2
)
)
)
)
)
}
} @@ around(
Executor.execute(ElasticRequest.createIndex(firstSearchIndex)),
Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie
),
test("aggregate using percentile ranks aggregation") {
val expectedResult = Map("500.0" -> 55.55555555555555, "600.0" -> 100.0)
checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument, genDocumentId, genTestDocument) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,66 @@ object ElasticAggregation {
final def minAggregation(name: String, field: String): MinAggregation =
Min(name = name, field = field, missing = None)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.RangeAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the field for which range aggregation will be executed
* @tparam A
* expected number type
* @param range
* the first range to be evaluated and transformed to bucket in [[zio.elasticsearch.aggregation.RangeAggregation]]
* @param ranges
* an array of ranges to be evaluated and transformed to buckets in
* [[zio.elasticsearch.aggregation.RangeAggregation]]
* @return
* an instance of [[zio.elasticsearch.aggregation.RangeAggregation]] that represents range aggregation to be
* performed.
*/
final def rangeAggregation[A: Numeric](
name: String,
field: Field[_, A],
range: SingleRange,
ranges: SingleRange*
Comment on lines +240 to +241
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this named SingleRange? Why not Range?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot have two case classes with the same name inside the same package. I don't like the SingleRange name either, but I can't think of a better approach right now.

): RangeAggregation =
Range(
name = name,
field = field.toString,
ranges = Chunk.fromIterable(range +: ranges),
keyed = None
)

/**
* Constructs an instance of [[zio.elasticsearch.aggregation.RangeAggregation]] using the specified parameters.
*
* @param name
* aggregation name
* @param field
* the field for which range aggregation will be executed
* @param range
* the first range to be evaluated and transformed to bucket in [[zio.elasticsearch.aggregation.RangeAggregation]]
* @param ranges
* an array of ranges to be evaluated and transformed to buckets in
* [[zio.elasticsearch.aggregation.RangeAggregation]]
* @return
* an instance of [[zio.elasticsearch.aggregation.RangeAggregation]] that represents range aggregation to be
* performed.
*/
final def rangeAggregation(
name: String,
field: String,
range: SingleRange,
ranges: SingleRange*
): RangeAggregation =
Range(
name = name,
field = field,
ranges = Chunk.fromIterable(range +: ranges),
keyed = None
)

/**
* Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MissingAggregation]] using the specified
* parameters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,63 @@ private[elasticsearch] final case class Percentiles(
}
}

private[elasticsearch] final case class SingleRange(
from: Option[Double],
to: Option[Double],
key: Option[String]
) { self =>
def from(value: Double): SingleRange = self.copy(from = Some(value))
def to(value: Double): SingleRange = self.copy(to = Some(value))
def key(value: String): SingleRange = self.copy(key = Some(value))
}

object SingleRange {

def from(value: Double): SingleRange =
SingleRange(from = Some(value), to = None, key = None)

def to(value: Double): SingleRange =
SingleRange(from = None, to = Some(value), key = None)

def apply(from: Double, to: Double): SingleRange =
SingleRange(from = Some(from), to = Some(to), key = None)

}

sealed trait RangeAggregation extends SingleElasticAggregation with WithAgg {
def asKeyed: Range
}

private[elasticsearch] final case class Range(
name: String,
field: String,
ranges: Chunk[SingleRange],
keyed: Option[Boolean]
) extends RangeAggregation { self =>

def asKeyed: Range = self.copy(keyed = Some(true))

def withAgg(agg: SingleElasticAggregation): MultipleAggregations =
multipleAggregations.aggregations(self, agg)

private[elasticsearch] def toJson: Json = {
val keyedJson: Json = keyed.fold(Obj())(m => Obj("keyed" -> m.toJson))

Obj(
name -> Obj(
"range" -> (Obj(
"field" -> field.toJson,
"ranges" -> Arr(ranges.map { r =>
r.from.fold(Obj())(m => Obj("from" -> m.toJson)) merge
r.to.fold(Obj())(m => Obj("to" -> m.toJson)) merge
r.key.fold(Obj())(m => Obj("key" -> m.toJson))
})
) merge keyedJson)
)
)
}
}

sealed trait StatsAggregation extends SingleElasticAggregation with HasMissing[StatsAggregation] with WithAgg

private[elasticsearch] final case class Stats(name: String, field: String, missing: Option[Double])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,30 @@ object AggregationResponse {
MaxAggregationResult(value)
case MinAggregationResponse(value) =>
MinAggregationResult(value)
case RegularRangeAggregationResponse(buckets) =>
RegularRangeAggregationResult(
buckets.map(b =>
RegularRangeAggregationBucketResult(
key = b.key,
to = b.to,
from = b.from,
docCount = b.docCount
)
)
)
case KeyedRangeAggregationResponse(buckets) =>
KeyedRangeAggregationResult(
buckets.map { case (k, v) =>
(
k,
KeyedRangeAggregationBucketResult(
to = v.to,
from = v.from,
docCount = v.docCount
)
)
}
)
case MissingAggregationResponse(value) =>
MissingAggregationResult(value)
case PercentileRanksAggregationResponse(values) =>
Expand Down Expand Up @@ -297,6 +321,54 @@ private[elasticsearch] object MinAggregationResponse {
implicit val decoder: JsonDecoder[MinAggregationResponse] = DeriveJsonDecoder.gen[MinAggregationResponse]
}

private[elasticsearch] final case class RegularRangeAggregationBucketResponse(
key: String,
to: Option[Double],
from: Option[Double],
@jsonField("doc_count")
docCount: Int
)
private[elasticsearch] object RegularRangeAggregationBucketResponse {
implicit val decoder: JsonDecoder[RegularRangeAggregationBucketResponse] =
DeriveJsonDecoder.gen[RegularRangeAggregationBucketResponse]
}

private[elasticsearch] final case class KeyedRangeAggregationBucketResponse(
to: Option[Double],
from: Option[Double],
@jsonField("doc_count")
docCount: Int
)
private[elasticsearch] object KeyedRangeAggregationBucketResponse {
implicit val decoder: JsonDecoder[KeyedRangeAggregationBucketResponse] =
DeriveJsonDecoder.gen[KeyedRangeAggregationBucketResponse]
}

private[elasticsearch] sealed trait RangeAggregationResponse extends AggregationResponse

private[elasticsearch] final case class RegularRangeAggregationResponse(
buckets: Chunk[RegularRangeAggregationBucketResponse]
) extends RangeAggregationResponse
private[elasticsearch] object RegularRangeAggregationResponse {
implicit val decoder: JsonDecoder[RegularRangeAggregationResponse] =
DeriveJsonDecoder.gen[RegularRangeAggregationResponse]
}

private[elasticsearch] final case class KeyedRangeAggregationResponse(
buckets: Map[String, KeyedRangeAggregationBucketResponse]
) extends RangeAggregationResponse
private[elasticsearch] object KeyedRangeAggregationResponse {
implicit val decoder: JsonDecoder[KeyedRangeAggregationResponse] =
DeriveJsonDecoder.gen[KeyedRangeAggregationResponse]
}

private[elasticsearch] object RangeAggregationResponse {
implicit val decoder: JsonDecoder[RangeAggregationResponse] =
RegularRangeAggregationResponse.decoder
.widen[RangeAggregationResponse]
.orElse(KeyedRangeAggregationResponse.decoder.widen[RangeAggregationResponse])
}

private[elasticsearch] final case class MissingAggregationResponse(@jsonField("doc_count") docCount: Int)
extends AggregationResponse

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse(
PercentileRanksAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("percentiles#") =>
PercentilesAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("range#") =>
RangeAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("stats#") =>
StatsAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _)
case str if str.contains("sum#") =>
Expand Down
12 changes: 12 additions & 0 deletions modules/library/src/main/scala/zio/elasticsearch/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,18 @@ package object elasticsearch extends IndexNameNewtype with IndexPatternNewtype w
def asPercentilesAggregation(name: String): RIO[R, Option[PercentilesAggregationResult]] =
aggregationAs[PercentilesAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
* @param name
* the name of the aggregation to retrieve
* @return
* a [[RIO]] effect that, when executed, will produce the aggregation as instance of
* [[result.RangeAggregationResult]].
*/
def asRangeAggregation(name: String): RIO[R, Option[RangeAggregationResult]] =
aggregationAs[RangeAggregationResult](name)

/**
* Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,29 @@ final case class PercentileRanksAggregationResult private[elasticsearch] (values
final case class PercentilesAggregationResult private[elasticsearch] (values: Map[String, Double])
extends AggregationResult

private[elasticsearch] sealed trait RangeAggregationResult extends AggregationResult

private[elasticsearch] final case class RegularRangeAggregationBucketResult(
key: String,
to: Option[Double],
from: Option[Double],
docCount: Int
)

private[elasticsearch] final case class KeyedRangeAggregationBucketResult(
to: Option[Double],
from: Option[Double],
docCount: Int
)

private[elasticsearch] final case class RegularRangeAggregationResult(
buckets: Chunk[RegularRangeAggregationBucketResult]
) extends RangeAggregationResult

private[elasticsearch] final case class KeyedRangeAggregationResult(
buckets: Map[String, KeyedRangeAggregationBucketResult]
) extends RangeAggregationResult

Comment on lines +68 to +90
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep this alphabetically sorted.

final case class StatsAggregationResult private[elasticsearch] (
count: Int,
min: Double,
Expand Down
Loading
Loading