Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions rust/lance-index/src/scalar/zonemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use arrow_array::{new_empty_array, ArrayRef, RecordBatch, UInt32Array, UInt64Arr
use arrow_schema::{DataType, Field};
use datafusion::execution::SendableRecordBatchStream;
use datafusion_common::ScalarValue;
use std::{collections::HashMap, sync::Arc};
use std::{collections::{HashMap, HashSet}, sync::Arc};

use super::{AnyQuery, IndexStore, MetricsCollector, ScalarIndex, SearchResult};
use crate::scalar::FragReuseIndex;
Expand All @@ -54,17 +54,17 @@ const ZONEMAP_INDEX_VERSION: u32 = 0;

/// Basic stats about zonemap index
#[derive(Debug, PartialEq, Clone)]
struct ZoneMapStatistics {
min: ScalarValue,
max: ScalarValue,
null_count: u32,
pub struct ZoneMapStatistics {
pub min: ScalarValue,
pub max: ScalarValue,
pub null_count: u32,
// only apply to float type
nan_count: u32,
fragment_id: u64,
pub nan_count: u32,
pub fragment_id: u64,
// zone_start is the start row of the zone in the fragment, also known
// as local row offset
zone_start: u64,
zone_length: usize,
pub zone_start: u64,
pub zone_length: usize,
}

impl DeepSizeOf for ZoneMapStatistics {
Expand Down Expand Up @@ -128,6 +128,28 @@ impl DeepSizeOf for ZoneMapIndex {
}

impl ZoneMapIndex {
/// Fetches all zones which contain values matching the given query
/// Used in building splits for engines like Spark, Trino, etc.
pub fn fetch_zones_for_query(
&self,
query: &dyn AnyQuery,
metrics: &dyn MetricsCollector,
) -> Result<Vec<ZoneMapStatistics>> {
metrics.record_comparisons(self.zones.len());
let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
let mut filtered_zones = Vec::new();

// Loop through zones and check each one
for zone in self.zones.iter() {
// Check if this zone matches the query
if self.evaluate_zone_against_query(zone, query)? {
filtered_zones.push(zone.clone());
}
}

Ok(filtered_zones)
}

/// Evaluates whether a zone could potentially contain values matching the query
/// For NaN, total order is used here
/// reference: https://doc.rust-lang.org/std/primitive.f64.html#method.total_cmp
Expand Down Expand Up @@ -1052,8 +1074,10 @@ mod tests {

// Add missing imports for the tests
use crate::metrics::NoOpMetricsCollector;
use crate::Index; // Import Index trait to access calculate_included_frags
use roaring::RoaringBitmap; // Import RoaringBitmap for the test
use crate::Index;
// Import Index trait to access calculate_included_frags
use roaring::RoaringBitmap;
// Import RoaringBitmap for the test
use std::collections::Bound;

// Adds a _rowaddr column emulating each batch as a new fragment
Expand Down
129 changes: 126 additions & 3 deletions rust/lance/src/dataset/statistics.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

Check warning on line 3 in rust/lance/src/dataset/statistics.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset/statistics.rs
//! Module for statistics related to the dataset.

use std::{collections::HashMap, future::Future, sync::Arc};
use std::{collections::{HashMap, HashSet}, future::Future, sync::Arc};

use super::{fragment::FileFragment, Dataset};
use lance_core::Result;
use lance_index::scalar::AnyQuery;

Check warning on line 10 in rust/lance/src/dataset/statistics.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset/statistics.rs
use lance_index::{metrics::NoOpMetricsCollector, scalar::zonemap::{ZoneMapIndex, ZoneMapStatistics}};
use lance_io::scheduler::{ScanScheduler, SchedulerConfig};

use super::{fragment::FileFragment, Dataset};
use lance_table::{
format::RowIdMeta,
io::deletion::deletion_file_path,
};

/// Statistics about a single field in the dataset
pub struct FieldStatistics {
Expand All @@ -26,11 +31,39 @@
pub fields: Vec<FieldStatistics>,
}

/// A `Split` refers to a subset of a dataset which has been filtered by a query
/// `Split` contains a list of zonemaps which contain fragment that have rows included in the query.
/// It also contains a mapping of fragment IDs to their associated files.
#[derive(Debug, Clone)]
pub struct Split {
/// Zone statistics for this split
pub zone_stats: Vec<ZoneMapStatistics>,
/// Map of fragment ID to all files associated with that fragment
pub files: HashMap<u64, FragmentFiles>,
}

#[derive(Debug, Default, Clone)]
pub struct FragmentFiles {
pub data_files: Vec<String>,
pub deletion_files: Vec<String>,
pub row_id_files: Vec<String>,
}

pub trait DatasetStatisticsExt {
/// Get statistics about the data in the dataset
fn calculate_data_stats(

Check warning on line 54 in rust/lance/src/dataset/statistics.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset/statistics.rs
self: &Arc<Self>,
) -> impl Future<Output = Result<DataStatistics>> + Send;

/// Get splits partitioned by target size with associated file mappings
fn get_splits(
self: &Arc<Self>,
query: &dyn AnyQuery,
target_size: usize
) -> Result<Vec<Split>>;

/// Build a mapping of fragment ID to FragmentFiles for the given zones
fn build_fragment_files_mapping(&self, zones: &Vec<ZoneMapStatistics>) -> Result<HashMap<u64, FragmentFiles>>;
}

impl DatasetStatisticsExt for Dataset {
Expand Down Expand Up @@ -66,4 +99,94 @@
fields: field_stats,
})
}

Check warning on line 102 in rust/lance/src/dataset/statistics.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset/statistics.rs
fn get_splits(
self: &Arc<Self>,
query: &dyn AnyQuery,
target_size: usize
) -> Result<Vec<Split>> {
let index: ZoneMapIndex; // TODO: find out how to get access to the zonemap index for all? columns
let mut filtered_zones = index.fetch_zones_for_query(query, &NoOpMetricsCollector)?;
// Sort by fragment ID to co-locate overlapping zones

Check warning on line 110 in rust/lance/src/dataset/statistics.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset/statistics.rs
filtered_zones.sort_by(|z1, z2| {
if z1.fragment_id != z2.fragment_id {
return z1.fragment_id.cmp(&z2.fragment_id)
}
return z1.zone_start.cmp(&z2.zone_start);
});

let mut splits = Vec::new();

Check warning on line 118 in rust/lance/src/dataset/statistics.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset/statistics.rs
let mut current_split_zones = Vec::new();
let mut current_size = 0;

for zone in filtered_zones {
let zone_size = zone.zone_length;

// TODO: need to confirm if this is right
if current_size > 0 && current_size + zone_size > target_size {
let files = self.build_fragment_files_mapping(&current_split_zones)?;
splits.push(Split {
zone_stats: current_split_zones,
files,
});

current_split_zones = vec![zone];
current_size = zone_size;
} else {
current_split_zones.push(zone);
current_size += zone_size;
}
}

if !current_split_zones.is_empty() {
let files = self.build_fragment_files_mapping(&current_split_zones)?;
splits.push(Split {
zone_stats: current_split_zones,
files,

Check warning on line 145 in rust/lance/src/dataset/statistics.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset/statistics.rs
});
}

Ok(splits)
}

/// Build a mapping of fragment ID to FragmentFiles for the given zones

Check warning on line 152 in rust/lance/src/dataset/statistics.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset/statistics.rs
/// TODO: maybe move to fragment.rs or similar
fn build_fragment_files_mapping(&self, zones: &Vec<ZoneMapStatistics>) -> Result<HashMap<u64, FragmentFiles>> {
let fragment_ids: HashSet<u64> = zones
.iter()
.map(|zone| zone.fragment_id)
.collect();

let mut files_mapping = HashMap::new();
for fragment_id in fragment_ids {
if let Some(fragment) = self.get_fragment(fragment_id as usize) {
let frag_metadata = fragment.metadata();

Check warning on line 163 in rust/lance/src/dataset/statistics.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset/statistics.rs
let mut fragment_files = FragmentFiles::default();

for data_file in &frag_metadata.files {
fragment_files.data_files.push(format!("data/{}", data_file.path));
}

if let Some(deletion_file) = &frag_metadata.deletion_file {
let deletion_path = deletion_file_path(
&self.base,
fragment_id,
deletion_file
);
fragment_files.deletion_files.push(deletion_path.to_string());
}

if let Some(row_id_meta) = &frag_metadata.row_id_meta {
if let RowIdMeta::External(external_file) = row_id_meta {
fragment_files.row_id_files.push(external_file.path.clone());
}

Check warning on line 182 in rust/lance/src/dataset/statistics.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance/src/dataset/statistics.rs
// Inline row IDs don't have separate files, so we skip them
}

files_mapping.insert(fragment_id, fragment_files);
}
}

Ok(files_mapping)
}
}
Loading