Skip to content

Get count from split metadata on simple time range query #5758

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 16 additions & 3 deletions quickwit/quickwit-indexing/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::num::NonZeroUsize;
use std::ops::RangeInclusive;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down Expand Up @@ -251,8 +252,16 @@ pub struct MockSplitBuilder {

impl MockSplitBuilder {
pub fn new(split_id: &str) -> Self {
Self::new_with_time_range(split_id, Some(121000..=130198))
}

pub fn new_with_time_range(split_id: &str, time_range: Option<RangeInclusive<i64>>) -> Self {
Self {
split_metadata: mock_split_meta(split_id, &IndexUid::for_test("test-index", 0)),
split_metadata: mock_split_meta(
split_id,
&IndexUid::for_test("test-index", 0),
time_range,
),
}
}

Expand All @@ -277,14 +286,18 @@ pub fn mock_split(split_id: &str) -> Split {
}

/// Mock split meta helper.
pub fn mock_split_meta(split_id: &str, index_uid: &IndexUid) -> SplitMetadata {
pub fn mock_split_meta(
split_id: &str,
index_uid: &IndexUid,
time_range: Option<RangeInclusive<i64>>,
) -> SplitMetadata {
SplitMetadata {
index_uid: index_uid.clone(),
split_id: split_id.to_string(),
partition_id: 13u64,
num_docs: if split_id == "split1" { 1_000_000 } else { 10 },
uncompressed_docs_size_in_bytes: 256,
time_range: Some(121000..=130198),
time_range,
create_timestamp: 0,
footer_offsets: 700..800,
..Default::default()
Expand Down
14 changes: 12 additions & 2 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,12 @@ async fn leaf_search_single_split(
// This may be the case for AllQuery with a sort by date and time filter, where the current
// split can't have better results.
//
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
if is_metadata_count_request_with_ast(
&query_ast,
&search_request,
split.timestamp_start,
split.timestamp_end,
) {
return Ok(get_leaf_resp_from_count(split.num_docs));
}

Expand Down Expand Up @@ -534,7 +539,12 @@ async fn leaf_search_single_split(
check_optimize_search_request(&mut search_request, &split, &split_filter);
collector.update_search_param(&search_request);
let mut leaf_search_response: LeafSearchResponse =
if is_metadata_count_request_with_ast(&query_ast, &search_request) {
if is_metadata_count_request_with_ast(
&query_ast,
&search_request,
split.timestamp_start,
split.timestamp_end,
) {
get_leaf_resp_from_count(searcher.num_docs())
} else if collector.is_count_only() {
let count = query.count(&searcher)? as u64;
Expand Down
205 changes: 163 additions & 42 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,13 @@ async fn search_partial_hits_phase_with_scroll(
/// metadata count.
///
/// This is done by exclusion, so we will need to keep it up to date if fields are added.
pub fn is_metadata_count_request(request: &SearchRequest) -> bool {
pub fn is_metadata_count_request(request: &SearchRequest, split: &SplitMetadata) -> bool {
let query_ast: QueryAst = serde_json::from_str(&request.query_ast).unwrap();
is_metadata_count_request_with_ast(&query_ast, request)

let start_time = split.time_range.as_ref().map(|x| x.start()).copied();
let end_time = split.time_range.as_ref().map(|x| x.end()).copied();

is_metadata_count_request_with_ast(&query_ast, request, start_time, end_time)
}

/// Check if the request is a count request without any filters, so we can just return the split
Expand All @@ -646,42 +650,47 @@ pub fn is_metadata_count_request(request: &SearchRequest) -> bool {
/// This is done by exclusion, so we will need to keep it up to date if fields are added.
///
/// The passed query_ast should match the serialized on in request.
pub fn is_metadata_count_request_with_ast(query_ast: &QueryAst, request: &SearchRequest) -> bool {
pub fn is_metadata_count_request_with_ast(
query_ast: &QueryAst,
request: &SearchRequest,
split_start_timestamp: Option<i64>,
split_end_timestamp: Option<i64>,
) -> bool {
if query_ast != &QueryAst::MatchAll {
return false;
}
if request.max_hits != 0 {
return false;
}

// If the start and end timestamp encompass the whole split, it is still a count query.
// We remove this currently on the leaf level, but not yet on the root level.
// There's a small advantage when we would do this on the root level, since we have the
// counts available on the split. On the leaf it is currently required to open the split
// to get the count.
if request.start_timestamp.is_some() || request.end_timestamp.is_some() {
return false;
match (request.start_timestamp, split_start_timestamp) {
(Some(request_start), Some(split_start)) if split_start >= request_start => {}
(Some(_), _) => return false,
(None, _) => {}
}
match (request.end_timestamp, split_end_timestamp) {
(Some(request_end), Some(split_end)) if split_end < request_end => {}
(Some(_), _) => return false,
(None, _) => {}
}

if request.aggregation_request.is_some() || !request.snippet_fields.is_empty() {
return false;
}
true
}

/// Get a leaf search response that returns the num_docs of the split
pub fn get_count_from_metadata(split_metadatas: &[SplitMetadata]) -> Vec<LeafSearchResponse> {
split_metadatas
.iter()
.map(|metadata| LeafSearchResponse {
num_hits: metadata.num_docs as u64,
partial_hits: Vec::new(),
failed_splits: Vec::new(),
num_attempted_splits: 1,
num_successful_splits: 1,
intermediate_aggregation_result: None,
resource_stats: None,
})
.collect()
pub fn get_count_from_metadata(metadata: &SplitMetadata) -> LeafSearchResponse {
LeafSearchResponse {
num_hits: metadata.num_docs as u64,
partial_hits: Vec::new(),
failed_splits: Vec::new(),
num_attempted_splits: 1,
num_successful_splits: 1,
intermediate_aggregation_result: None,
resource_stats: None,
}
}

/// Returns true if the query is particularly memory intensive.
Expand Down Expand Up @@ -729,26 +738,31 @@ pub(crate) async fn search_partial_hits_phase(
split_metadatas: &[SplitMetadata],
cluster_client: &ClusterClient,
) -> crate::Result<LeafSearchResponse> {
let leaf_search_responses: Vec<LeafSearchResponse> =
if is_metadata_count_request(search_request) {
get_count_from_metadata(split_metadatas)
let mut leaf_search_responses: Vec<LeafSearchResponse> =
Vec::with_capacity(split_metadatas.len());
let mut leaf_search_jobs = Vec::new();
for split in split_metadatas {
if is_metadata_count_request(search_request, split) {
leaf_search_responses.push(get_count_from_metadata(split));
} else {
let jobs: Vec<SearchJob> = split_metadatas.iter().map(SearchJob::from).collect();
let assigned_leaf_search_jobs = cluster_client
.search_job_placer
.assign_jobs(jobs, &HashSet::default())
.await?;
let mut leaf_request_tasks = Vec::new();
for (client, client_jobs) in assigned_leaf_search_jobs {
let leaf_request = jobs_to_leaf_request(
search_request,
indexes_metas_for_leaf_search,
client_jobs,
)?;
leaf_request_tasks.push(cluster_client.leaf_search(leaf_request, client.clone()));
}
try_join_all(leaf_request_tasks).await?
};
leaf_search_jobs.push(SearchJob::from(split));
}
}

if !leaf_search_jobs.is_empty() {
let assigned_leaf_search_jobs = cluster_client
.search_job_placer
.assign_jobs(leaf_search_jobs, &HashSet::default())
.await?;
let mut leaf_request_tasks = Vec::new();
for (client, client_jobs) in assigned_leaf_search_jobs {
let leaf_request =
jobs_to_leaf_request(search_request, indexes_metas_for_leaf_search, client_jobs)?;
leaf_request_tasks.push(cluster_client.leaf_search(leaf_request, client.clone()));
}
let executed_leaf_search_responses = try_join_all(leaf_request_tasks).await?;
leaf_search_responses.extend(executed_leaf_search_responses);
}

// Creates a collector which merges responses into one
let merge_collector =
Expand Down Expand Up @@ -5045,4 +5059,111 @@ mod tests {
assert_eq!(search_response.failed_splits.len(), 1);
Ok(())
}

#[tokio::test]
async fn test_count_from_metastore_in_contained_time_range() -> anyhow::Result<()> {
let search_request = quickwit_proto::search::SearchRequest {
start_timestamp: Some(122_000),
end_timestamp: Some(129_000),
index_id_patterns: vec!["test-index".to_string()],
query_ast: serde_json::to_string(&QueryAst::MatchAll)
.expect("MatchAll should be JSON serializable."),
max_hits: 0,
..Default::default()
};

let index_metadata = IndexMetadata::for_test("test-index", "ram:///test-index");
let index_uid = index_metadata.index_uid.clone();

let mut mock_metastore = MockMetastoreService::new();
mock_metastore
.expect_list_indexes_metadata()
.returning(move |_q| {
Ok(ListIndexesMetadataResponse::for_test(vec![
index_metadata.clone(),
]))
});
mock_metastore.expect_list_splits().returning(move |_req| {
let splits = vec![
MockSplitBuilder::new_with_time_range("split_before", Some(100_000..=110_000))
.with_index_uid(&index_uid)
.build(),
MockSplitBuilder::new_with_time_range(
"split_overlap_start",
Some(120_000..=123_000),
)
.with_index_uid(&index_uid)
.build(),
MockSplitBuilder::new_with_time_range("split_overlap_end", Some(128_000..=140_000))
.with_index_uid(&index_uid)
.build(),
MockSplitBuilder::new_with_time_range(
"split_covering_whole",
Some(100_000..=200_000),
)
.with_index_uid(&index_uid)
.build(),
MockSplitBuilder::new_with_time_range("split_inside", Some(124_000..=126_000))
.with_index_uid(&index_uid)
.build(),
];
let resp = ListSplitsResponse::try_from_splits(splits).unwrap();
Ok(ServiceStream::from(vec![Ok(resp)]))
});

let mut mock_search = MockSearchService::new();
mock_search
.expect_leaf_search()
.withf(|leaf_search_req| {
let mut expected = HashSet::new();

// Notice split_inside is not included.
expected.insert("split_before");
expected.insert("split_covering_whole");
expected.insert("split_overlap_end");
expected.insert("split_overlap_start");

leaf_search_req.leaf_requests.len() == 1
&& leaf_search_req.leaf_requests[0]
.split_offsets
.iter()
.map(|s| s.split_id.as_str())
.collect::<HashSet<&str>>()
== expected
})
.times(1)
.returning(|_| {
Ok(quickwit_proto::search::LeafSearchResponse {
num_hits: 5,
partial_hits: vec![],
failed_splits: Vec::new(),
num_attempted_splits: 0,
..Default::default()
})
});
mock_search.expect_fetch_docs().returning(|fetch_req| {
Ok(quickwit_proto::search::FetchDocsResponse {
hits: get_doc_for_fetch_req(fetch_req),
})
});

let searcher_pool = searcher_pool_for_test([("127.0.0.1:1001", mock_search)]);
let search_job_placer = SearchJobPlacer::new(searcher_pool);
let cluster_client = ClusterClient::new(search_job_placer);

let ctx = SearcherContext::for_test();
let resp = root_search(
&ctx,
search_request,
MetastoreServiceClient::from_mock(mock_metastore),
&cluster_client,
)
.await?;

assert_eq!(resp.num_hits, 15);
assert_eq!(resp.hits.len(), 0);
assert_eq!(resp.num_successful_splits, 1);

Ok(())
}
}
16 changes: 16 additions & 0 deletions quickwit/rest-api-tests/scenarii/qw_search_api/0001_ts_range.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,19 @@ params:
query: "ts:>=1684993002 AND ts:<1684993004"
expected:
num_hits: 2
---
endpoint: simple/search
params:
query: "*"
start_timestamp: 1684993001
end_timestamp: 1684993004
expected:
num_hits: 3
---
endpoint: simple/search
params:
query: "*"
start_timestamp: 1684993001
end_timestamp: 1684993005
expected:
num_hits: 4