diff --git a/quickwit/quickwit-indexing/src/test_utils.rs b/quickwit/quickwit-indexing/src/test_utils.rs index 40bc45aa809..74e41493607 100644 --- a/quickwit/quickwit-indexing/src/test_utils.rs +++ b/quickwit/quickwit-indexing/src/test_utils.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::num::NonZeroUsize; +use std::ops::RangeInclusive; use std::str::FromStr; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -251,8 +252,16 @@ pub struct MockSplitBuilder { impl MockSplitBuilder { pub fn new(split_id: &str) -> Self { + Self::new_with_time_range(split_id, Some(121000..=130198)) + } + + pub fn new_with_time_range(split_id: &str, time_range: Option>) -> Self { Self { - split_metadata: mock_split_meta(split_id, &IndexUid::for_test("test-index", 0)), + split_metadata: mock_split_meta( + split_id, + &IndexUid::for_test("test-index", 0), + time_range, + ), } } @@ -277,14 +286,18 @@ pub fn mock_split(split_id: &str) -> Split { } /// Mock split meta helper. -pub fn mock_split_meta(split_id: &str, index_uid: &IndexUid) -> SplitMetadata { +pub fn mock_split_meta( + split_id: &str, + index_uid: &IndexUid, + time_range: Option>, +) -> SplitMetadata { SplitMetadata { index_uid: index_uid.clone(), split_id: split_id.to_string(), partition_id: 13u64, num_docs: if split_id == "split1" { 1_000_000 } else { 10 }, uncompressed_docs_size_in_bytes: 256, - time_range: Some(121000..=130198), + time_range, create_timestamp: 0, footer_offsets: 700..800, ..Default::default() diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 1ed858376c6..3d741c44c10 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -462,7 +462,12 @@ async fn leaf_search_single_split( // This may be the case for AllQuery with a sort by date and time filter, where the current // split can't have better results. // - if is_metadata_count_request_with_ast(&query_ast, &search_request) { + if is_metadata_count_request_with_ast( + &query_ast, + &search_request, + split.timestamp_start, + split.timestamp_end, + ) { return Ok(get_leaf_resp_from_count(split.num_docs)); } @@ -534,7 +539,12 @@ async fn leaf_search_single_split( check_optimize_search_request(&mut search_request, &split, &split_filter); collector.update_search_param(&search_request); let mut leaf_search_response: LeafSearchResponse = - if is_metadata_count_request_with_ast(&query_ast, &search_request) { + if is_metadata_count_request_with_ast( + &query_ast, + &search_request, + split.timestamp_start, + split.timestamp_end, + ) { get_leaf_resp_from_count(searcher.num_docs()) } else if collector.is_count_only() { let count = query.count(&searcher)? as u64; diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 7ab356e1c58..411cc6d28ad 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -635,9 +635,13 @@ async fn search_partial_hits_phase_with_scroll( /// metadata count. /// /// This is done by exclusion, so we will need to keep it up to date if fields are added. -pub fn is_metadata_count_request(request: &SearchRequest) -> bool { +pub fn is_metadata_count_request(request: &SearchRequest, split: &SplitMetadata) -> bool { let query_ast: QueryAst = serde_json::from_str(&request.query_ast).unwrap(); - is_metadata_count_request_with_ast(&query_ast, request) + + let start_time = split.time_range.as_ref().map(|x| x.start()).copied(); + let end_time = split.time_range.as_ref().map(|x| x.end()).copied(); + + is_metadata_count_request_with_ast(&query_ast, request, start_time, end_time) } /// Check if the request is a count request without any filters, so we can just return the split @@ -646,7 +650,12 @@ pub fn is_metadata_count_request(request: &SearchRequest) -> bool { /// This is done by exclusion, so we will need to keep it up to date if fields are added. /// /// The passed query_ast should match the serialized on in request. -pub fn is_metadata_count_request_with_ast(query_ast: &QueryAst, request: &SearchRequest) -> bool { +pub fn is_metadata_count_request_with_ast( + query_ast: &QueryAst, + request: &SearchRequest, + split_start_timestamp: Option, + split_end_timestamp: Option, +) -> bool { if query_ast != &QueryAst::MatchAll { return false; } @@ -654,14 +663,17 @@ pub fn is_metadata_count_request_with_ast(query_ast: &QueryAst, request: &Search return false; } - // If the start and end timestamp encompass the whole split, it is still a count query. - // We remove this currently on the leaf level, but not yet on the root level. - // There's a small advantage when we would do this on the root level, since we have the - // counts available on the split. On the leaf it is currently required to open the split - // to get the count. - if request.start_timestamp.is_some() || request.end_timestamp.is_some() { - return false; + match (request.start_timestamp, split_start_timestamp) { + (Some(request_start), Some(split_start)) if split_start >= request_start => {} + (Some(_), _) => return false, + (None, _) => {} } + match (request.end_timestamp, split_end_timestamp) { + (Some(request_end), Some(split_end)) if split_end < request_end => {} + (Some(_), _) => return false, + (None, _) => {} + } + if request.aggregation_request.is_some() || !request.snippet_fields.is_empty() { return false; } @@ -669,19 +681,16 @@ pub fn is_metadata_count_request_with_ast(query_ast: &QueryAst, request: &Search } /// Get a leaf search response that returns the num_docs of the split -pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec { - split_metadatas - .iter() - .map(|metadata| LeafSearchResponse { - num_hits: metadata.num_docs as u64, - partial_hits: Vec::new(), - failed_splits: Vec::new(), - num_attempted_splits: 1, - num_successful_splits: 1, - intermediate_aggregation_result: None, - resource_stats: None, - }) - .collect() +pub fn get_count_from_metadata(metadata: &SplitMetadata) -> LeafSearchResponse { + LeafSearchResponse { + num_hits: metadata.num_docs as u64, + partial_hits: Vec::new(), + failed_splits: Vec::new(), + num_attempted_splits: 1, + num_successful_splits: 1, + intermediate_aggregation_result: None, + resource_stats: None, + } } /// Returns true if the query is particularly memory intensive. @@ -729,26 +738,31 @@ pub(crate) async fn search_partial_hits_phase( split_metadatas: &[SplitMetadata], cluster_client: &ClusterClient, ) -> crate::Result { - let leaf_search_responses: Vec = - if is_metadata_count_request(search_request) { - get_count_from_metadata(split_metadatas) + let mut leaf_search_responses: Vec = + Vec::with_capacity(split_metadatas.len()); + let mut leaf_search_jobs = Vec::new(); + for split in split_metadatas { + if is_metadata_count_request(search_request, split) { + leaf_search_responses.push(get_count_from_metadata(split)); } else { - let jobs: Vec = split_metadatas.iter().map(SearchJob::from).collect(); - let assigned_leaf_search_jobs = cluster_client - .search_job_placer - .assign_jobs(jobs, &HashSet::default()) - .await?; - let mut leaf_request_tasks = Vec::new(); - for (client, client_jobs) in assigned_leaf_search_jobs { - let leaf_request = jobs_to_leaf_request( - search_request, - indexes_metas_for_leaf_search, - client_jobs, - )?; - leaf_request_tasks.push(cluster_client.leaf_search(leaf_request, client.clone())); - } - try_join_all(leaf_request_tasks).await? - }; + leaf_search_jobs.push(SearchJob::from(split)); + } + } + + if !leaf_search_jobs.is_empty() { + let assigned_leaf_search_jobs = cluster_client + .search_job_placer + .assign_jobs(leaf_search_jobs, &HashSet::default()) + .await?; + let mut leaf_request_tasks = Vec::new(); + for (client, client_jobs) in assigned_leaf_search_jobs { + let leaf_request = + jobs_to_leaf_request(search_request, indexes_metas_for_leaf_search, client_jobs)?; + leaf_request_tasks.push(cluster_client.leaf_search(leaf_request, client.clone())); + } + let executed_leaf_search_responses = try_join_all(leaf_request_tasks).await?; + leaf_search_responses.extend(executed_leaf_search_responses); + } // Creates a collector which merges responses into one let merge_collector = @@ -5045,4 +5059,111 @@ mod tests { assert_eq!(search_response.failed_splits.len(), 1); Ok(()) } + + #[tokio::test] + async fn test_count_from_metastore_in_contained_time_range() -> anyhow::Result<()> { + let search_request = quickwit_proto::search::SearchRequest { + start_timestamp: Some(122_000), + end_timestamp: Some(129_000), + index_id_patterns: vec!["test-index".to_string()], + query_ast: serde_json::to_string(&QueryAst::MatchAll) + .expect("MatchAll should be JSON serializable."), + max_hits: 0, + ..Default::default() + }; + + let index_metadata = IndexMetadata::for_test("test-index", "ram:///test-index"); + let index_uid = index_metadata.index_uid.clone(); + + let mut mock_metastore = MockMetastoreService::new(); + mock_metastore + .expect_list_indexes_metadata() + .returning(move |_q| { + Ok(ListIndexesMetadataResponse::for_test(vec![ + index_metadata.clone(), + ])) + }); + mock_metastore.expect_list_splits().returning(move |_req| { + let splits = vec![ + MockSplitBuilder::new_with_time_range("split_before", Some(100_000..=110_000)) + .with_index_uid(&index_uid) + .build(), + MockSplitBuilder::new_with_time_range( + "split_overlap_start", + Some(120_000..=123_000), + ) + .with_index_uid(&index_uid) + .build(), + MockSplitBuilder::new_with_time_range("split_overlap_end", Some(128_000..=140_000)) + .with_index_uid(&index_uid) + .build(), + MockSplitBuilder::new_with_time_range( + "split_covering_whole", + Some(100_000..=200_000), + ) + .with_index_uid(&index_uid) + .build(), + MockSplitBuilder::new_with_time_range("split_inside", Some(124_000..=126_000)) + .with_index_uid(&index_uid) + .build(), + ]; + let resp = ListSplitsResponse::try_from_splits(splits).unwrap(); + Ok(ServiceStream::from(vec![Ok(resp)])) + }); + + let mut mock_search = MockSearchService::new(); + mock_search + .expect_leaf_search() + .withf(|leaf_search_req| { + let mut expected = HashSet::new(); + + // Notice split_inside is not included. + expected.insert("split_before"); + expected.insert("split_covering_whole"); + expected.insert("split_overlap_end"); + expected.insert("split_overlap_start"); + + leaf_search_req.leaf_requests.len() == 1 + && leaf_search_req.leaf_requests[0] + .split_offsets + .iter() + .map(|s| s.split_id.as_str()) + .collect::>() + == expected + }) + .times(1) + .returning(|_| { + Ok(quickwit_proto::search::LeafSearchResponse { + num_hits: 5, + partial_hits: vec![], + failed_splits: Vec::new(), + num_attempted_splits: 0, + ..Default::default() + }) + }); + mock_search.expect_fetch_docs().returning(|fetch_req| { + Ok(quickwit_proto::search::FetchDocsResponse { + hits: get_doc_for_fetch_req(fetch_req), + }) + }); + + let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search)]); + let search_job_placer = SearchJobPlacer::new(searcher_pool); + let cluster_client = ClusterClient::new(search_job_placer); + + let ctx = SearcherContext::for_test(); + let resp = root_search( + &ctx, + search_request, + MetastoreServiceClient::from_mock(mock_metastore), + &cluster_client, + ) + .await?; + + assert_eq!(resp.num_hits, 15); + assert_eq!(resp.hits.len(), 0); + assert_eq!(resp.num_successful_splits, 1); + + Ok(()) + } } diff --git a/quickwit/rest-api-tests/scenarii/qw_search_api/0001_ts_range.yaml b/quickwit/rest-api-tests/scenarii/qw_search_api/0001_ts_range.yaml index 8c42620ca55..c94e6fc93a8 100644 --- a/quickwit/rest-api-tests/scenarii/qw_search_api/0001_ts_range.yaml +++ b/quickwit/rest-api-tests/scenarii/qw_search_api/0001_ts_range.yaml @@ -34,3 +34,19 @@ params: query: "ts:>=1684993002 AND ts:<1684993004" expected: num_hits: 2 +--- +endpoint: simple/search +params: + query: "*" + start_timestamp: 1684993001 + end_timestamp: 1684993004 +expected: + num_hits: 3 +--- +endpoint: simple/search +params: + query: "*" + start_timestamp: 1684993001 + end_timestamp: 1684993005 +expected: + num_hits: 4