diff --git a/docs/overview/aggregations/elastic_aggregation_range.md b/docs/overview/aggregations/elastic_aggregation_range.md new file mode 100644 index 000000000..828f584d9 --- /dev/null +++ b/docs/overview/aggregations/elastic_aggregation_range.md @@ -0,0 +1,25 @@ +--- +id: elastic_aggregation_range +title: "Range Aggregation" +--- + +The `Range` aggregation is a multi-value aggregation enables the user to define a set of ranges. During the aggregation process, the values extracted from each document will be checked against each bucket range. + +In order to use the `Range` aggregation import the following: +```scala +import zio.elasticsearch.aggregation.RangeAggregation +import zio.elasticsearch.ElasticAggregation.RangeAggregation +``` + +You can create a `Range` aggregation using the `rangeAggregation` method this way: +```scala +val aggregation: RangeAggregation = rangeAggregation(name = "rangeAggregation", field = "testField", range = SingleRange.to(23.9)) +``` + +You can create a [type-safe](https://lambdaworks.github.io/zio-elasticsearch/overview/overview_zio_prelude_schema) `Range` aggregation using the `rangeAggregation` method this way: +```scala +// Document.intField must be number value, because of Min aggregation +val aggregation: RangeAggregation = rangeAggregation(name = "rangeAggregation", field = Document.intField, range = SingleRange.to(23.9)) +``` + +You can find more information about `Range` aggregation [here](https://www.elastic.co/guide/en/elasticsearch/reference/7.17/search-aggregations-bucket-range-aggregation.html). diff --git a/modules/integration/src/test/scala/zio/elasticsearch/HttpExecutorSpec.scala b/modules/integration/src/test/scala/zio/elasticsearch/HttpExecutorSpec.scala index 329c93bd4..d20cc833f 100644 --- a/modules/integration/src/test/scala/zio/elasticsearch/HttpExecutorSpec.scala +++ b/modules/integration/src/test/scala/zio/elasticsearch/HttpExecutorSpec.scala @@ -21,7 +21,7 @@ import zio.elasticsearch.ElasticAggregation._ import zio.elasticsearch.ElasticHighlight.highlight import zio.elasticsearch.ElasticQuery.{script => _, _} import zio.elasticsearch.ElasticSort.sortBy -import zio.elasticsearch.aggregation.AggregationOrder +import zio.elasticsearch.aggregation.{AggregationOrder, SingleRange} import zio.elasticsearch.data.GeoPoint import zio.elasticsearch.domain.{PartialTestDocument, TestDocument, TestSubDocument} import zio.elasticsearch.executor.Executor @@ -33,7 +33,7 @@ import zio.elasticsearch.query.sort.SortOrder._ import zio.elasticsearch.query.sort.SourceType.NumberType import zio.elasticsearch.query.{Distance, FunctionScoreBoostMode, FunctionScoreFunction, InnerHits} import zio.elasticsearch.request.{CreationOutcome, DeletionOutcome} -import zio.elasticsearch.result.{FilterAggregationResult, Item, MaxAggregationResult, UpdateByQueryResult} +import zio.elasticsearch.result._ import zio.elasticsearch.script.{Painless, Script} import zio.json.ast.Json.{Arr, Str} import zio.schema.codec.JsonCodec @@ -256,6 +256,49 @@ object HttpExecutorSpec extends IntegrationSpec { Executor.execute(ElasticRequest.createIndex(firstSearchIndex)), Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie ), + test("aggregate using range aggregation") { + checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument) { + (firstDocumentId, firstDocument, secondDocumentId, secondDocument) => + for { + _ <- Executor.execute(ElasticRequest.deleteByQuery(firstSearchIndex, matchAll)) + _ <- Executor.execute( + ElasticRequest + .upsert[TestDocument](firstSearchIndex, firstDocumentId, firstDocument.copy(intField = 120)) + ) + _ <- + Executor.execute( + ElasticRequest + .upsert[TestDocument](firstSearchIndex, secondDocumentId, secondDocument.copy(intField = 180)) + .refreshTrue + ) + aggregation = rangeAggregation( + name = "aggregationInt", + field = TestDocument.intField, + range = SingleRange(from = 100.0, to = 200.0) + ) + aggsRes <- + Executor + .execute(ElasticRequest.aggregate(selectors = firstSearchIndex, aggregation = aggregation)) + .asRangeAggregation("aggregationInt") + } yield assert(aggsRes.head)( + equalTo( + RegularRangeAggregationResult( + Chunk( + RegularRangeAggregationBucketResult( + key = "100.0-200.0", + from = Some(100.0), + to = Some(200.0), + docCount = 2 + ) + ) + ) + ) + ) + } + } @@ around( + Executor.execute(ElasticRequest.createIndex(firstSearchIndex)), + Executor.execute(ElasticRequest.deleteIndex(firstSearchIndex)).orDie + ), test("aggregate using percentile ranks aggregation") { val expectedResult = Map("500.0" -> 55.55555555555555, "600.0" -> 100.0) checkOnce(genDocumentId, genTestDocument, genDocumentId, genTestDocument, genDocumentId, genTestDocument) { diff --git a/modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala b/modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala index 22c2b08b0..e8cf5aae7 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/ElasticAggregation.scala @@ -216,6 +216,66 @@ object ElasticAggregation { final def minAggregation(name: String, field: String): MinAggregation = Min(name = name, field = field, missing = None) + /** + * Constructs an instance of [[zio.elasticsearch.aggregation.RangeAggregation]] using the specified parameters. + * + * @param name + * aggregation name + * @param field + * the field for which range aggregation will be executed + * @tparam A + * expected number type + * @param range + * the first range to be evaluated and transformed to bucket in [[zio.elasticsearch.aggregation.RangeAggregation]] + * @param ranges + * an array of ranges to be evaluated and transformed to buckets in + * [[zio.elasticsearch.aggregation.RangeAggregation]] + * @return + * an instance of [[zio.elasticsearch.aggregation.RangeAggregation]] that represents range aggregation to be + * performed. + */ + final def rangeAggregation[A: Numeric]( + name: String, + field: Field[_, A], + range: SingleRange, + ranges: SingleRange* + ): RangeAggregation = + Range( + name = name, + field = field.toString, + ranges = Chunk.fromIterable(range +: ranges), + keyed = None + ) + + /** + * Constructs an instance of [[zio.elasticsearch.aggregation.RangeAggregation]] using the specified parameters. + * + * @param name + * aggregation name + * @param field + * the field for which range aggregation will be executed + * @param range + * the first range to be evaluated and transformed to bucket in [[zio.elasticsearch.aggregation.RangeAggregation]] + * @param ranges + * an array of ranges to be evaluated and transformed to buckets in + * [[zio.elasticsearch.aggregation.RangeAggregation]] + * @return + * an instance of [[zio.elasticsearch.aggregation.RangeAggregation]] that represents range aggregation to be + * performed. + */ + final def rangeAggregation( + name: String, + field: String, + range: SingleRange, + ranges: SingleRange* + ): RangeAggregation = + Range( + name = name, + field = field, + ranges = Chunk.fromIterable(range +: ranges), + keyed = None + ) + /** * Constructs a type-safe instance of [[zio.elasticsearch.aggregation.MissingAggregation]] using the specified * parameters. diff --git a/modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala b/modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala index ec194dc27..d969a4d1e 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/aggregation/Aggregations.scala @@ -351,6 +351,63 @@ private[elasticsearch] final case class Percentiles( } } +private[elasticsearch] final case class SingleRange( + from: Option[Double], + to: Option[Double], + key: Option[String] +) { self => + def from(value: Double): SingleRange = self.copy(from = Some(value)) + def to(value: Double): SingleRange = self.copy(to = Some(value)) + def key(value: String): SingleRange = self.copy(key = Some(value)) +} + +object SingleRange { + + def from(value: Double): SingleRange = + SingleRange(from = Some(value), to = None, key = None) + + def to(value: Double): SingleRange = + SingleRange(from = None, to = Some(value), key = None) + + def apply(from: Double, to: Double): SingleRange = + SingleRange(from = Some(from), to = Some(to), key = None) + +} + +sealed trait RangeAggregation extends SingleElasticAggregation with WithAgg { + def asKeyed: Range +} + +private[elasticsearch] final case class Range( + name: String, + field: String, + ranges: Chunk[SingleRange], + keyed: Option[Boolean] +) extends RangeAggregation { self => + + def asKeyed: Range = self.copy(keyed = Some(true)) + + def withAgg(agg: SingleElasticAggregation): MultipleAggregations = + multipleAggregations.aggregations(self, agg) + + private[elasticsearch] def toJson: Json = { + val keyedJson: Json = keyed.fold(Obj())(m => Obj("keyed" -> m.toJson)) + + Obj( + name -> Obj( + "range" -> (Obj( + "field" -> field.toJson, + "ranges" -> Arr(ranges.map { r => + r.from.fold(Obj())(m => Obj("from" -> m.toJson)) merge + r.to.fold(Obj())(m => Obj("to" -> m.toJson)) merge + r.key.fold(Obj())(m => Obj("key" -> m.toJson)) + }) + ) merge keyedJson) + ) + ) + } +} + sealed trait StatsAggregation extends SingleElasticAggregation with HasMissing[StatsAggregation] with WithAgg private[elasticsearch] final case class Stats(name: String, field: String, missing: Option[Double]) diff --git a/modules/library/src/main/scala/zio/elasticsearch/executor/response/AggregationResponse.scala b/modules/library/src/main/scala/zio/elasticsearch/executor/response/AggregationResponse.scala index 12497006e..a1af2397a 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/executor/response/AggregationResponse.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/executor/response/AggregationResponse.scala @@ -81,6 +81,30 @@ object AggregationResponse { MaxAggregationResult(value) case MinAggregationResponse(value) => MinAggregationResult(value) + case RegularRangeAggregationResponse(buckets) => + RegularRangeAggregationResult( + buckets.map(b => + RegularRangeAggregationBucketResult( + key = b.key, + to = b.to, + from = b.from, + docCount = b.docCount + ) + ) + ) + case KeyedRangeAggregationResponse(buckets) => + KeyedRangeAggregationResult( + buckets.map { case (k, v) => + ( + k, + KeyedRangeAggregationBucketResult( + to = v.to, + from = v.from, + docCount = v.docCount + ) + ) + } + ) case MissingAggregationResponse(value) => MissingAggregationResult(value) case PercentileRanksAggregationResponse(values) => @@ -297,6 +321,54 @@ private[elasticsearch] object MinAggregationResponse { implicit val decoder: JsonDecoder[MinAggregationResponse] = DeriveJsonDecoder.gen[MinAggregationResponse] } +private[elasticsearch] final case class RegularRangeAggregationBucketResponse( + key: String, + to: Option[Double], + from: Option[Double], + @jsonField("doc_count") + docCount: Int +) +private[elasticsearch] object RegularRangeAggregationBucketResponse { + implicit val decoder: JsonDecoder[RegularRangeAggregationBucketResponse] = + DeriveJsonDecoder.gen[RegularRangeAggregationBucketResponse] +} + +private[elasticsearch] final case class KeyedRangeAggregationBucketResponse( + to: Option[Double], + from: Option[Double], + @jsonField("doc_count") + docCount: Int +) +private[elasticsearch] object KeyedRangeAggregationBucketResponse { + implicit val decoder: JsonDecoder[KeyedRangeAggregationBucketResponse] = + DeriveJsonDecoder.gen[KeyedRangeAggregationBucketResponse] +} + +private[elasticsearch] sealed trait RangeAggregationResponse extends AggregationResponse + +private[elasticsearch] final case class RegularRangeAggregationResponse( + buckets: Chunk[RegularRangeAggregationBucketResponse] +) extends RangeAggregationResponse +private[elasticsearch] object RegularRangeAggregationResponse { + implicit val decoder: JsonDecoder[RegularRangeAggregationResponse] = + DeriveJsonDecoder.gen[RegularRangeAggregationResponse] +} + +private[elasticsearch] final case class KeyedRangeAggregationResponse( + buckets: Map[String, KeyedRangeAggregationBucketResponse] +) extends RangeAggregationResponse +private[elasticsearch] object KeyedRangeAggregationResponse { + implicit val decoder: JsonDecoder[KeyedRangeAggregationResponse] = + DeriveJsonDecoder.gen[KeyedRangeAggregationResponse] +} + +private[elasticsearch] object RangeAggregationResponse { + implicit val decoder: JsonDecoder[RangeAggregationResponse] = + RegularRangeAggregationResponse.decoder + .widen[RangeAggregationResponse] + .orElse(KeyedRangeAggregationResponse.decoder.widen[RangeAggregationResponse]) +} + private[elasticsearch] final case class MissingAggregationResponse(@jsonField("doc_count") docCount: Int) extends AggregationResponse diff --git a/modules/library/src/main/scala/zio/elasticsearch/executor/response/SearchWithAggregationsResponse.scala b/modules/library/src/main/scala/zio/elasticsearch/executor/response/SearchWithAggregationsResponse.scala index f02886732..528066c09 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/executor/response/SearchWithAggregationsResponse.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/executor/response/SearchWithAggregationsResponse.scala @@ -92,6 +92,8 @@ private[elasticsearch] final case class SearchWithAggregationsResponse( PercentileRanksAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _) case str if str.contains("percentiles#") => PercentilesAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _) + case str if str.contains("range#") => + RangeAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _) case str if str.contains("stats#") => StatsAggregationResponse.decoder.decodeJson(data.toString).map(field.split("#")(1) -> _) case str if str.contains("sum#") => diff --git a/modules/library/src/main/scala/zio/elasticsearch/package.scala b/modules/library/src/main/scala/zio/elasticsearch/package.scala index 8c7d1eefd..567849912 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/package.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/package.scala @@ -156,6 +156,18 @@ package object elasticsearch extends IndexNameNewtype with IndexPatternNewtype w def asPercentilesAggregation(name: String): RIO[R, Option[PercentilesAggregationResult]] = aggregationAs[PercentilesAggregationResult](name) + /** + * Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]]. + * + * @param name + * the name of the aggregation to retrieve + * @return + * a [[RIO]] effect that, when executed, will produce the aggregation as instance of + * [[result.RangeAggregationResult]]. + */ + def asRangeAggregation(name: String): RIO[R, Option[RangeAggregationResult]] = + aggregationAs[RangeAggregationResult](name) + /** * Executes the [[ElasticRequest.SearchRequest]] or the [[ElasticRequest.SearchAndAggregateRequest]]. * diff --git a/modules/library/src/main/scala/zio/elasticsearch/result/AggregationResult.scala b/modules/library/src/main/scala/zio/elasticsearch/result/AggregationResult.scala index 0f6037cf4..eb33651d8 100644 --- a/modules/library/src/main/scala/zio/elasticsearch/result/AggregationResult.scala +++ b/modules/library/src/main/scala/zio/elasticsearch/result/AggregationResult.scala @@ -65,6 +65,29 @@ final case class PercentileRanksAggregationResult private[elasticsearch] (values final case class PercentilesAggregationResult private[elasticsearch] (values: Map[String, Double]) extends AggregationResult +private[elasticsearch] sealed trait RangeAggregationResult extends AggregationResult + +private[elasticsearch] final case class RegularRangeAggregationBucketResult( + key: String, + to: Option[Double], + from: Option[Double], + docCount: Int +) + +private[elasticsearch] final case class KeyedRangeAggregationBucketResult( + to: Option[Double], + from: Option[Double], + docCount: Int +) + +private[elasticsearch] final case class RegularRangeAggregationResult( + buckets: Chunk[RegularRangeAggregationBucketResult] +) extends RangeAggregationResult + +private[elasticsearch] final case class KeyedRangeAggregationResult( + buckets: Map[String, KeyedRangeAggregationBucketResult] +) extends RangeAggregationResult + final case class StatsAggregationResult private[elasticsearch] ( count: Int, min: Double, diff --git a/modules/library/src/test/scala/zio/elasticsearch/ElasticAggregationSpec.scala b/modules/library/src/test/scala/zio/elasticsearch/ElasticAggregationSpec.scala index 202950431..cd31b3938 100644 --- a/modules/library/src/test/scala/zio/elasticsearch/ElasticAggregationSpec.scala +++ b/modules/library/src/test/scala/zio/elasticsearch/ElasticAggregationSpec.scala @@ -340,6 +340,103 @@ object ElasticAggregationSpec extends ZIOSpecDefault { ) ) }, + test("range") { + val aggregationTo = rangeAggregation("aggregation1", TestDocument.intField, SingleRange.to(23.9)) + val aggregationFrom = rangeAggregation("aggregation2", TestDocument.intField, SingleRange.from(2)) + val aggregationFromTo = + rangeAggregation("aggregation3", "testField", SingleRange.from(4).to(344.0)) + val aggregationRegular = rangeAggregation( + "aggregation4", + "testField", + SingleRange.to(23.9), + SingleRange.from(3), + SingleRange.to(12).from(0) + ) + val aggregationKeyed = rangeAggregation( + "aggregation5", + TestDocument.intField, + SingleRange.from(30).to(60), + SingleRange.from(60).to(100) + ).asKeyed + val aggregationNamedKeyed = rangeAggregation( + "aggregation6", + "testField", + SingleRange.from(30).to(60).key("Low"), + SingleRange.from(60).to(100).key("High") + ).asKeyed + + assert(aggregationTo)( + equalTo( + Range( + "aggregation1", + "testField", + Chunk.fromIterable(List(SingleRange.to(23.9))), + None + ) + ) + ) && assert(aggregationFrom)( + equalTo( + Range( + "aggregation2", + "testField", + Chunk.fromIterable(List(SingleRange.from(2.0))), + None + ) + ) + ) && assert(aggregationFromTo)( + equalTo( + Range( + "aggregation3", + "testField", + Chunk.fromIterable(List(SingleRange(from = 4, to = 344.0))), + None + ) + ) + ) && assert(aggregationRegular)( + equalTo( + Range( + "aggregation4", + "testField", + Chunk.fromIterable( + List( + SingleRange.to(23.9), + SingleRange.from(3), + SingleRange.to(12).from(0) + ) + ), + None + ) + ) + ) && assert(aggregationKeyed)( + equalTo( + Range( + "aggregation5", + "testField", + Chunk.fromIterable( + List( + SingleRange.from(30).to(60), + SingleRange.from(60).to(100) + ) + ), + Some(true) + ) + ) + ) && assert(aggregationNamedKeyed)( + equalTo( + Range( + "aggregation6", + "testField", + Chunk.fromIterable( + List( + SingleRange.from(30).to(60).key("Low"), + SingleRange.from(60).to(100).key("High") + ) + ), + Some(true) + ) + ) + ) + }, test("stats") { val aggregation = statsAggregation("aggregation", "testField") val aggregationTs = statsAggregation("aggregation", TestDocument.intField) @@ -1266,6 +1363,123 @@ object ElasticAggregationSpec extends ZIOSpecDefault { assert(aggregationWithMissing.toJson)(equalTo(expectedWithMissing.toJson)) && assert(aggregationWithAllParams.toJson)(equalTo(expectedWithAllParams.toJson)) }, + test("range") { + val aggregationTo = rangeAggregation("aggregation1", "testField", SingleRange.to(23.9)) + val aggregationFrom = rangeAggregation("aggregation2", "testField", SingleRange.from(2)) + val aggregationFromTo = + rangeAggregation("aggregation3", "testField", SingleRange.from(4).to(344.0)) + val aggregationRegular: RangeAggregation = rangeAggregation( + "aggregation4", + TestDocument.intField, + SingleRange.to(23.9), + SingleRange.from(3), + SingleRange.to(12).from(0) + ) + val aggregationKeyed = rangeAggregation( + "aggregation5", + TestDocument.intField, + SingleRange.from(30).to(60), + SingleRange.from(60).to(100) + ).asKeyed + val aggregationNamedKeyed = rangeAggregation( + "aggregation6", + TestDocument.intField, + SingleRange.from(30).to(60).key("Low"), + SingleRange.from(60).to(100).key("High") + ).asKeyed + + val expectedTo = + """ + |{ + | "aggregation1": { + | "range": { + | "field": "testField", + | "ranges": [ + | { "to": 23.9 } + | ] + | } + | } + |} + |""".stripMargin + val expectedFrom = + """ + |{ + | "aggregation2": { + | "range": { + | "field": "testField", + | "ranges": [ + | { "from": 2.0 } + | ] + | } + | } + |} + |""".stripMargin + val expectedFromTo = + """ + |{ + | "aggregation3": { + | "range": { + | "field": "testField", + | "ranges": [ + | { "from": 4.0, "to": 344.0 } + | ] + | } + | } + |} + |""".stripMargin + val expectedRegular = + """ + |{ + | "aggregation4": { + | "range": { + | "field": "testField", + | "ranges": [ + | { "to": 23.9 }, + | { "from": 3.0 }, + | { "from": 0.0, "to": 12.0 } + | ] + | } + | } + |} + |""".stripMargin + val expectedKeyed = + """ + |{ + | "aggregation5": { + | "range": { + | "field": "testField", + | "keyed": true, + | "ranges": [ + | { "from": 30.0, "to": 60.0 }, + | { "from": 60.0, "to": 100.0 } + | ] + | } + | } + |} + |""".stripMargin + val expectedNamedKeyed = + """ + |{ + | "aggregation6": { + | "range": { + | "field": "testField", + | "keyed": true, + | "ranges": [ + | { "from": 30.0, "to": 60.0, "key": "Low" }, + | { "from": 60.0, "to": 100.0, "key": "High" } + | ] + | } + | } + |} + |""".stripMargin + + assert(aggregationTo.toJson)(equalTo(expectedTo.toJson)) && + assert(aggregationFrom.toJson)(equalTo(expectedFrom.toJson)) && + assert(aggregationFromTo.toJson)(equalTo(expectedFromTo.toJson)) && + assert(aggregationRegular.toJson)(equalTo(expectedRegular.toJson)) && + assert(aggregationKeyed.toJson)(equalTo(expectedKeyed.toJson)) && + assert(aggregationNamedKeyed.toJson)(equalTo(expectedNamedKeyed.toJson)) + }, test("stats") { val aggregation = statsAggregation("aggregation", "testField") val aggregationTs = statsAggregation("aggregation", TestDocument.intField) diff --git a/website/sidebars.js b/website/sidebars.js index 8c11fe381..df607d201 100644 --- a/website/sidebars.js +++ b/website/sidebars.js @@ -58,6 +58,7 @@ module.exports = { 'overview/aggregations/elastic_aggregation_missing', 'overview/aggregations/elastic_aggregation_percentile_ranks', 'overview/aggregations/elastic_aggregation_percentiles', + 'overview/aggregations/elastic_aggregation_range', 'overview/aggregations/elastic_aggregation_stats', 'overview/aggregations/elastic_aggregation_sum', 'overview/aggregations/elastic_aggregation_terms',