Skip to content

Commit 26485bc

Browse files
committed
Fix empty intermediate aggregation results merge
1 parent 2115139 commit 26485bc

File tree

1 file changed

+44
-21
lines changed

1 file changed

+44
-21
lines changed

quickwit/quickwit-search/src/collector.rs

Lines changed: 44 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -860,8 +860,10 @@ impl Collector for QuickwitCollector {
860860
}
861861
}
862862

863-
fn map_error(err: postcard::Error) -> TantivyError {
864-
TantivyError::InternalError(format!("merge result Postcard error: {err}"))
863+
fn map_error(error: postcard::Error) -> TantivyError {
864+
TantivyError::InternalError(format!(
865+
"failed to merge intermediate aggregation results: Postcard error: {error}"
866+
))
865867
}
866868

867869
/// Merges a set of Leaf Results.
@@ -883,24 +885,26 @@ fn merge_intermediate_aggregation_result<'a>(
883885
Some(serialized)
884886
}
885887
Some(QuickwitAggregations::TantivyAggregations(_)) => {
886-
let fruits: Vec<IntermediateAggregationResults> = intermediate_aggregation_results
887-
.map(|intermediate_aggregation_result| {
888-
postcard::from_bytes(intermediate_aggregation_result).map_err(map_error)
889-
})
890-
.collect::<Result<_, _>>()?;
891-
892-
let mut fruit_iter = fruits.into_iter();
893-
if let Some(first_fruit) = fruit_iter.next() {
894-
let mut merged_fruit = first_fruit;
895-
for fruit in fruit_iter {
896-
merged_fruit.merge_fruits(fruit)?;
897-
}
898-
let serialized = postcard::to_allocvec(&merged_fruit).map_err(map_error)?;
899-
900-
Some(serialized)
901-
} else {
902-
None
903-
}
888+
let merged_opt = intermediate_aggregation_results
889+
.map(|bytes| postcard::from_bytes(bytes).map_err(map_error))
890+
.try_fold::<_, _, Result<_, TantivyError>>(
891+
None,
892+
|acc: Option<IntermediateAggregationResults>, fruits_res| {
893+
let fruits = fruits_res?;
894+
match acc {
895+
Some(mut merged_fruits) => {
896+
merged_fruits.merge_fruits(fruits)?;
897+
Ok(Some(merged_fruits))
898+
}
899+
None => Ok(Some(fruits)),
900+
}
901+
},
902+
)?;
903+
let serialized = match merged_opt {
904+
Some(fruit) => postcard::to_allocvec(&fruit).map_err(map_error)?,
905+
None => Vec::new(),
906+
};
907+
Some(serialized)
904908
}
905909
None => None,
906910
};
@@ -1293,10 +1297,12 @@ mod tests {
12931297
SortOrder, SortValue, SplitSearchError,
12941298
};
12951299
use tantivy::TantivyDocument;
1300+
use tantivy::aggregation::agg_req::Aggregations;
12961301
use tantivy::collector::Collector;
12971302

12981303
use super::{IncrementalCollector, make_merge_collector};
1299-
use crate::collector::top_k_partial_hits;
1304+
use crate::QuickwitAggregations;
1305+
use crate::collector::{merge_intermediate_aggregation_result, top_k_partial_hits};
13001306

13011307
#[test]
13021308
fn test_merge_partial_hits_no_tie() {
@@ -2002,4 +2008,21 @@ mod tests {
20022008
);
20032009
// TODO would be nice to test aggregation too.
20042010
}
2011+
2012+
#[test]
2013+
fn test_merge_empty_intermediate_aggregation_result() {
2014+
let merged = merge_intermediate_aggregation_result(&None, std::iter::empty()).unwrap();
2015+
assert!(merged.is_none());
2016+
2017+
let aggregations_json = r#"{
2018+
"avg_price": { "avg": { "field": "price" } }
2019+
}"#;
2020+
let tantivy_aggregations: Aggregations = serde_json::from_str(aggregations_json).unwrap();
2021+
let quickwit_aggregations = QuickwitAggregations::TantivyAggregations(tantivy_aggregations);
2022+
let merged =
2023+
merge_intermediate_aggregation_result(&Some(quickwit_aggregations), std::iter::empty())
2024+
.unwrap()
2025+
.unwrap();
2026+
assert!(merged.is_empty());
2027+
}
20052028
}

0 commit comments

Comments
 (0)