diff --git a/Cargo.lock b/Cargo.lock index a13a037af..accfd66b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5465,6 +5465,7 @@ dependencies = [ "arrow-schema", "async-stream", "async-trait", + "byteorder", "bytes", "datafusion", "datafusion-catalog", @@ -5481,6 +5482,7 @@ dependencies = [ "las-crs", "laz", "object_store", + "rayon", "sedona", "sedona-expr", "sedona-geometry", diff --git a/rust/sedona-pointcloud/Cargo.toml b/rust/sedona-pointcloud/Cargo.toml index 478e9ad31..921f30c8c 100644 --- a/rust/sedona-pointcloud/Cargo.toml +++ b/rust/sedona-pointcloud/Cargo.toml @@ -36,6 +36,7 @@ arrow-ipc = { workspace = true } arrow-schema = { workspace = true } async-stream = "0.3.6" async-trait = { workspace = true } +byteorder = { workspace = true } bytes = { workspace = true } datafusion-catalog = { workspace = true } datafusion-common = { workspace = true } @@ -51,6 +52,7 @@ las = { version = "0.9.10", features = ["laz"] } las-crs = { version = "1.0.0" } laz = "0.12.0" object_store = { workspace = true } +rayon = "1.11.0" sedona-expr = { workspace = true } sedona-geometry = { workspace = true } diff --git a/rust/sedona-pointcloud/src/las/builder.rs b/rust/sedona-pointcloud/src/las/builder.rs index 8e2e8a852..0f3960851 100644 --- a/rust/sedona-pointcloud/src/las/builder.rs +++ b/rust/sedona-pointcloud/src/las/builder.rs @@ -35,9 +35,10 @@ use geoarrow_array::{ use geoarrow_schema::Dimension; use las::{Header, Point}; -use crate::{ - las::{metadata::ExtraAttribute, options::LasExtraBytes, schema::try_schema_from_header}, - options::GeometryEncoding, +use crate::las::{ + metadata::ExtraAttribute, + options::{GeometryEncoding, LasExtraBytes}, + schema::try_schema_from_header, }; #[derive(Debug)] @@ -515,9 +516,9 @@ mod tests { use las::{point::Format, Builder, Writer}; use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; - use crate::{ - las::{options::LasExtraBytes, reader::LasFileReaderFactory}, - options::PointcloudOptions, + use crate::las::{ + options::{LasExtraBytes, LasOptions}, + reader::LasFileReaderFactory, }; #[tokio::test] @@ -544,7 +545,7 @@ mod tests { let file_reader = LasFileReaderFactory::new(Arc::new(store), None) .create_reader( PartitionedFile::new(location, object.size), - PointcloudOptions::default(), + LasOptions::default(), ) .unwrap(); let metadata = file_reader.get_metadata().await.unwrap(); @@ -578,7 +579,7 @@ mod tests { let file_reader = LasFileReaderFactory::new(Arc::new(store), None) .create_reader( PartitionedFile::new(location, object.size), - PointcloudOptions::default().with_las_extra_bytes(LasExtraBytes::Typed), + LasOptions::default().with_las_extra_bytes(LasExtraBytes::Typed), ) .unwrap(); let metadata = file_reader.get_metadata().await.unwrap(); diff --git a/rust/sedona-pointcloud/src/las/format.rs b/rust/sedona-pointcloud/src/las/format.rs index 48e070542..d2f682400 100644 --- a/rust/sedona-pointcloud/src/las/format.rs +++ b/rust/sedona-pointcloud/src/las/format.rs @@ -33,9 +33,9 @@ use datafusion_physical_plan::ExecutionPlan; use futures::{StreamExt, TryStreamExt}; use object_store::{ObjectMeta, ObjectStore}; -use crate::{ - las::{metadata::LasMetadataReader, reader::LasFileReaderFactory, source::LasSource}, - options::PointcloudOptions, +use crate::las::{ + metadata::LasMetadataReader, options::LasOptions, reader::LasFileReaderFactory, + source::LasSource, }; #[derive(Debug, Clone, Copy)] @@ -56,7 +56,7 @@ impl Extension { /// Factory struct used to create [LasFormat] pub struct LasFormatFactory { // inner options for LAS/LAZ - pub options: Option, + pub options: Option, extension: Extension, } @@ -70,7 +70,7 @@ impl LasFormatFactory { } /// Creates an instance of [LasFormatFactory] with customized default options - pub fn new_with(options: PointcloudOptions, extension: Extension) -> Self { + pub fn new_with(options: LasOptions, extension: Extension) -> Self { Self { options: Some(options), extension, @@ -87,8 +87,8 @@ impl FileFormatFactory for LasFormatFactory { let mut options = state .config_options() .extensions - .get::() - .or_else(|| state.table_options().extensions.get::()) + .get::() + .or_else(|| state.table_options().extensions.get::()) .cloned() .or(self.options.clone()) .unwrap_or_default(); @@ -129,7 +129,7 @@ impl fmt::Debug for LasFormatFactory { /// The LAS/LAZ `FileFormat` implementation #[derive(Debug)] pub struct LasFormat { - pub options: PointcloudOptions, + pub options: LasOptions, extension: Extension, } @@ -141,7 +141,7 @@ impl LasFormat { } } - pub fn with_options(mut self, options: PointcloudOptions) -> Self { + pub fn with_options(mut self, options: LasOptions) -> Self { self.options = options; self } @@ -195,7 +195,7 @@ impl FileFormat for LasFormat { Ok::<_, DataFusionError>((loc_path, schema)) }) .boxed() // Workaround https://github.com/rust-lang/rust/issues/64552 - // fetch schemas concurrently, if requested + // fetch schemas concurrently, if requested (note that this is not parallel) .buffered(state.config_options().execution.meta_fetch_concurrency) .try_collect() .await?; diff --git a/rust/sedona-pointcloud/src/las/metadata.rs b/rust/sedona-pointcloud/src/las/metadata.rs index ab9b40ac0..5bb8052ac 100644 --- a/rust/sedona-pointcloud/src/las/metadata.rs +++ b/rust/sedona-pointcloud/src/las/metadata.rs @@ -36,12 +36,10 @@ use las::{ use laz::laszip::ChunkTable; use object_store::{ObjectMeta, ObjectStore}; -use crate::{ - las::{ - schema::try_schema_from_header, - statistics::{chunk_statistics, LasStatistics}, - }, - options::PointcloudOptions, +use crate::las::{ + options::LasOptions, + schema::try_schema_from_header, + statistics::{chunk_statistics, LasStatistics}, }; /// LAS/LAZ chunk metadata @@ -92,7 +90,7 @@ pub struct LasMetadataReader<'a> { store: &'a dyn ObjectStore, object_meta: &'a ObjectMeta, file_metadata_cache: Option>, - options: PointcloudOptions, + options: LasOptions, } impl<'a> LasMetadataReader<'a> { @@ -115,18 +113,11 @@ impl<'a> LasMetadataReader<'a> { } /// set table options - pub fn with_options(mut self, options: PointcloudOptions) -> Self { + pub fn with_options(mut self, options: LasOptions) -> Self { self.options = options; self } - /// Fetch header - pub async fn fetch_header(&self) -> Result { - fetch_header(self.store, self.object_meta) - .await - .map_err(DataFusionError::External) - } - /// Fetch LAS/LAZ metadata from the remote object store pub async fn fetch_metadata(&self) -> Result, DataFusionError> { let Self { @@ -149,13 +140,9 @@ impl<'a> LasMetadataReader<'a> { return Ok(las_file_metadata); } - let header = self.fetch_header().await?; + let header = fetch_header(*store, object_meta).await?; let extra_attributes = extra_bytes_attributes(&header)?; - let chunk_table = if header.laz_vlr().is_ok() { - laz_chunk_table(*store, object_meta, &header).await? - } else { - las_chunk_table(&header).await? - }; + let chunk_table = fetch_chunk_table(*store, object_meta, &header).await?; let statistics = if options.collect_statistics { Some( chunk_statistics( @@ -164,6 +151,7 @@ impl<'a> LasMetadataReader<'a> { &chunk_table, &header, options.persist_statistics, + options.parallel_statistics_extraction, ) .await?, ) @@ -192,7 +180,7 @@ impl<'a> LasMetadataReader<'a> { let schema = try_schema_from_header( &metadata.header, self.options.geometry_encoding, - self.options.las.extra_bytes, + self.options.extra_bytes, )?; Ok(schema) @@ -241,7 +229,8 @@ impl<'a> LasMetadataReader<'a> { } } -async fn fetch_header( +/// Fetch the [Header] of a LAS/LAZ file +pub async fn fetch_header( store: &(impl ObjectStore + ?Sized), object_meta: &ObjectMeta, ) -> Result> { @@ -296,6 +285,7 @@ async fn fetch_header( Ok(builder.into_header()?) } +/// Extra attribute information (custom attributes in LAS/LAZ files) #[derive(Debug, Clone, PartialEq)] pub struct ExtraAttribute { pub data_type: DataType, @@ -368,6 +358,19 @@ fn extra_bytes_attributes( Ok(attributes) } +/// Fetch or generate chunk table metadata. +pub async fn fetch_chunk_table( + store: &(impl ObjectStore + ?Sized), + object_meta: &ObjectMeta, + header: &Header, +) -> Result, Box> { + if header.laz_vlr().is_ok() { + laz_chunk_table(store, object_meta, header).await + } else { + las_chunk_table(header).await + } +} + async fn laz_chunk_table( store: &(impl ObjectStore + ?Sized), object_meta: &ObjectMeta, @@ -482,7 +485,7 @@ mod tests { use las::{point::Format, Builder, Reader, Writer}; use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; - use crate::las::metadata::LasMetadataReader; + use crate::las::metadata::fetch_header; #[tokio::test] async fn header_basic_e2e() { @@ -503,14 +506,13 @@ mod tests { let store = LocalFileSystem::new(); let location = Path::from_filesystem_path(&tmp_path).unwrap(); let object_meta = store.head(&location).await.unwrap(); - let metadata_reader = LasMetadataReader::new(&store, &object_meta); // read with las `Reader` let reader = Reader::from_path(&tmp_path).unwrap(); assert_eq!( reader.header(), - &metadata_reader.fetch_header().await.unwrap() + &fetch_header(&store, &object_meta).await.unwrap() ); } } diff --git a/rust/sedona-pointcloud/src/las/opener.rs b/rust/sedona-pointcloud/src/las/opener.rs index 249e53925..941788eff 100644 --- a/rust/sedona-pointcloud/src/las/opener.rs +++ b/rust/sedona-pointcloud/src/las/opener.rs @@ -29,12 +29,10 @@ use futures::StreamExt; use sedona_expr::spatial_filter::SpatialFilter; use sedona_geometry::bounding_box::BoundingBox; -use crate::{ - las::{ - reader::{LasFileReader, LasFileReaderFactory}, - schema::try_schema_from_header, - }, - options::PointcloudOptions, +use crate::las::{ + options::LasOptions, + reader::{LasFileReader, LasFileReaderFactory}, + schema::try_schema_from_header, }; pub struct LasOpener { @@ -42,13 +40,18 @@ pub struct LasOpener { pub projection: Arc<[usize]>, /// Optional limit on the number of rows to read pub limit: Option, + /// Filter predicate for pruning pub predicate: Option>, /// Factory for instantiating LAS/LAZ reader pub file_reader_factory: Arc, /// Table options - pub options: PointcloudOptions, + pub options: LasOptions, /// Target batch size - pub(crate) batch_size: usize, + pub batch_size: usize, + /// Target partition count + pub partition_count: usize, + /// Partition to read + pub partition: usize, } impl FileOpener for LasOpener { @@ -56,6 +59,9 @@ impl FileOpener for LasOpener { let projection = self.projection.clone(); let limit = self.limit; let batch_size = self.batch_size; + let round_robin = self.options.round_robin_partitioning; + let partition_count = self.partition_count; + let partition = self.partition; let predicate = self.predicate.clone(); @@ -68,7 +74,7 @@ impl FileOpener for LasOpener { let schema = Arc::new(try_schema_from_header( &metadata.header, file_reader.options.geometry_encoding, - file_reader.options.las.extra_bytes, + file_reader.options.extra_bytes, )?); let pruning_predicate = predicate.and_then(|physical_expr| { @@ -117,6 +123,11 @@ impl FileOpener for LasOpener { let stream = async_stream::try_stream! { for (i, chunk_meta) in metadata.chunk_table.iter().enumerate() { + // round robin + if round_robin && i % partition_count != partition { + continue; + } + // limit if let Some(limit) = limit { if row_count >= limit { @@ -187,10 +198,10 @@ mod tests { let ctx = SedonaContext::new_local_interactive().await.unwrap(); // ensure no faulty chunk pruning - ctx.sql("SET pointcloud.geometry_encoding = 'plain'") + ctx.sql("SET las.geometry_encoding = 'plain'") .await .unwrap(); - ctx.sql("SET pointcloud.collect_statistics = 'true'") + ctx.sql("SET las.collect_statistics = 'true'") .await .unwrap(); @@ -212,9 +223,7 @@ mod tests { .unwrap(); assert_eq!(count, 50000); - ctx.sql("SET pointcloud.geometry_encoding = 'wkb'") - .await - .unwrap(); + ctx.sql("SET las.geometry_encoding = 'wkb'").await.unwrap(); let count = ctx .sql(&format!("SELECT * FROM \"{path}\" WHERE ST_Intersects(geometry, ST_GeomFromText('POLYGON ((0 0, 0.7 0, 0.7 0.7, 0 0.7, 0 0))'))")) .await @@ -233,10 +242,10 @@ mod tests { let ctx = SedonaContext::new_local_interactive().await.unwrap(); // ensure no faulty chunk pruning - ctx.sql("SET pointcloud.geometry_encoding = 'plain'") + ctx.sql("SET las.geometry_encoding = 'plain'") .await .unwrap(); - ctx.sql("SET pointcloud.collect_statistics = 'true'") + ctx.sql("SET las.collect_statistics = 'true'") .await .unwrap(); @@ -258,9 +267,7 @@ mod tests { .unwrap(); assert_eq!(count, 50000); - ctx.sql("SET pointcloud.geometry_encoding = 'wkb'") - .await - .unwrap(); + ctx.sql("SET las.geometry_encoding = 'wkb'").await.unwrap(); let count = ctx .sql(&format!("SELECT * FROM \"{path}\" WHERE ST_Intersects(geometry, ST_GeomFromText('POLYGON ((0 0, 0.7 0, 0.7 0.7, 0 0.7, 0 0))'))")) .await @@ -270,4 +277,32 @@ mod tests { .unwrap(); assert_eq!(count, 50000); } + + #[tokio::test] + async fn round_robin_partitioning() { + // file with two clusters, one at 0.5 one at 1.0 + let path = "tests/data/large.laz"; + + let ctx = SedonaContext::new_local_interactive().await.unwrap(); + + let result1 = ctx + .sql(&format!("SELECT * FROM \"{path}\"")) + .await + .unwrap() + .collect() + .await + .unwrap(); + + ctx.sql("SET las.round_robin_partitioning = 'true'") + .await + .unwrap(); + let result2 = ctx + .sql(&format!("SELECT * FROM \"{path}\"")) + .await + .unwrap() + .collect() + .await + .unwrap(); + assert_eq!(result1, result2); + } } diff --git a/rust/sedona-pointcloud/src/las/options.rs b/rust/sedona-pointcloud/src/las/options.rs index de02628fe..a7c656e2e 100644 --- a/rust/sedona-pointcloud/src/las/options.rs +++ b/rust/sedona-pointcloud/src/las/options.rs @@ -18,11 +18,61 @@ use std::{fmt::Display, str::FromStr}; use datafusion_common::{ - config::{ConfigField, Visit}, - config_namespace, + config::{ConfigExtension, ConfigField, Visit}, error::DataFusionError, + extensions_options, }; +/// Geometry representation +#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)] +pub enum GeometryEncoding { + /// Use plain coordinates as three fields `x`, `y`, `z` with datatype Float64 encoding. + #[default] + Plain, + /// Resolves the coordinates to a fields `geometry` with WKB encoding. + Wkb, + /// Resolves the coordinates to a fields `geometry` with separated GeoArrow encoding. + Native, +} + +impl Display for GeometryEncoding { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + GeometryEncoding::Plain => f.write_str("plain"), + GeometryEncoding::Wkb => f.write_str("wkb"), + GeometryEncoding::Native => f.write_str("native"), + } + } +} + +impl FromStr for GeometryEncoding { + type Err = String; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "plain" => Ok(Self::Plain), + "wkb" => Ok(Self::Wkb), + "native" => Ok(Self::Native), + s => Err(format!("Unable to parse from `{s}`")), + } + } +} + +impl ConfigField for GeometryEncoding { + fn visit(&self, v: &mut V, key: &str, _description: &'static str) { + v.some( + &format!("{key}.geometry_encoding"), + self, + "Specify point geometry encoding", + ); + } + + fn set(&mut self, _key: &str, value: &str) -> Result<(), DataFusionError> { + *self = value.parse().map_err(DataFusionError::Configuration)?; + Ok(()) + } +} + /// LAS extra bytes handling #[derive(Clone, Copy, Default, PartialEq, Eq, Debug)] pub enum LasExtraBytes { @@ -73,16 +123,37 @@ impl ConfigField for LasExtraBytes { } } -config_namespace! { - /// The LAS config options +extensions_options! { + /// LAS/LAZ configuration options + /// + /// * `geometry encoding`: plain (x, y, z), wkb or native (geoarrow) + /// * `collect statistics`: extract las/laz chunk statistics (requires a full scan on registration) + /// * `parallel statistics extraction`: extract statistics in parallel + /// * `persist statistics`: store statistics in a sidecar file for future reuse (requires write access) + /// * `round robin partitioning`: read chunks in parallel with round robin instead of byte range (default) + /// * `extra bytes`: las extra byte attributes handling, ignore, keep as binary blob, or typed pub struct LasOptions { + pub geometry_encoding: GeometryEncoding, default = GeometryEncoding::default() pub extra_bytes: LasExtraBytes, default = LasExtraBytes::default() + pub collect_statistics: bool, default = false + pub parallel_statistics_extraction: bool, default = false + pub persist_statistics: bool, default = false + pub round_robin_partitioning: bool, default = false } } +impl ConfigExtension for LasOptions { + const PREFIX: &'static str = "las"; +} + impl LasOptions { - pub fn with_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self { + pub fn with_geometry_encoding(mut self, geometry_encoding: GeometryEncoding) -> Self { + self.geometry_encoding = geometry_encoding; + self + } + + pub fn with_las_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self { self.extra_bytes = extra_bytes; self } @@ -97,13 +168,13 @@ mod test { prelude::{SessionConfig, SessionContext}, }; - use crate::{ - las::format::{Extension, LasFormatFactory}, - options::PointcloudOptions, + use crate::las::{ + format::{Extension, LasFormatFactory}, + options::LasOptions, }; fn setup_context() -> SessionContext { - let config = SessionConfig::new().with_option_extension(PointcloudOptions::default()); + let config = SessionConfig::new().with_option_extension(LasOptions::default()); let mut state = SessionStateBuilder::new().with_config(config).build(); let file_format = Arc::new(LasFormatFactory::new(Extension::Las)); @@ -135,12 +206,8 @@ mod test { assert_eq!(df.schema().fields().len(), 3); // overwrite options - ctx.sql("SET pointcloud.geometry_encoding = 'wkb'") - .await - .unwrap(); - ctx.sql("SET pointcloud.las.extra_bytes = 'blob'") - .await - .unwrap(); + ctx.sql("SET las.geometry_encoding = 'wkb'").await.unwrap(); + ctx.sql("SET las.extra_bytes = 'blob'").await.unwrap(); let df = ctx .sql("SELECT geometry, extra_bytes FROM 'tests/data/extra.las'") diff --git a/rust/sedona-pointcloud/src/las/reader.rs b/rust/sedona-pointcloud/src/las/reader.rs index 64939a19f..80411cfb5 100644 --- a/rust/sedona-pointcloud/src/las/reader.rs +++ b/rust/sedona-pointcloud/src/las/reader.rs @@ -36,12 +36,10 @@ use laz::{ }; use object_store::ObjectStore; -use crate::{ - las::{ - builder::RowBuilder, - metadata::{ChunkMeta, LasMetadata, LasMetadataReader}, - }, - options::PointcloudOptions, +use crate::las::{ + builder::RowBuilder, + metadata::{ChunkMeta, LasMetadata, LasMetadataReader}, + options::LasOptions, }; /// LAS/LAZ file reader factory @@ -66,7 +64,7 @@ impl LasFileReaderFactory { pub fn create_reader( &self, partitioned_file: PartitionedFile, - options: PointcloudOptions, + options: LasOptions, ) -> Result, DataFusionError> { Ok(Box::new(LasFileReader { partitioned_file, @@ -82,7 +80,7 @@ pub struct LasFileReader { partitioned_file: PartitionedFile, store: Arc, metadata_cache: Option>, - pub options: PointcloudOptions, + pub options: LasOptions, } impl LasFileReader { @@ -111,10 +109,7 @@ impl LasFileReader { let num_points = chunk_meta.num_points as usize; let mut builder = RowBuilder::new(num_points, header.clone()) .with_geometry_encoding(self.options.geometry_encoding) - .with_extra_attributes( - metadata.extra_attributes.clone(), - self.options.las.extra_bytes, - ); + .with_extra_attributes(metadata.extra_attributes.clone(), self.options.extra_bytes); // parse points if header.laz_vlr().is_ok() { @@ -187,7 +182,7 @@ pub fn record_decompressor( Ok(decompressor) } -pub(crate) fn read_point(buffer: R, header: &Header) -> Result { +fn read_point(buffer: R, header: &Header) -> Result { RawPoint::read_from(buffer, header.point_format()) .map(|raw_point| Point::new(raw_point, header.transforms())) .map_err(|e| DataFusionError::External(Box::new(e))) diff --git a/rust/sedona-pointcloud/src/las/schema.rs b/rust/sedona-pointcloud/src/las/schema.rs index c3e683944..f6e799fce 100644 --- a/rust/sedona-pointcloud/src/las/schema.rs +++ b/rust/sedona-pointcloud/src/las/schema.rs @@ -22,7 +22,7 @@ use geoarrow_schema::{CoordType, Crs, Dimension, Metadata, PointType, WkbType}; use las::Header; use las_crs::{get_epsg_from_geotiff_crs, get_epsg_from_wkt_crs_bytes}; -use crate::{las::options::LasExtraBytes, options::GeometryEncoding}; +use crate::las::options::{GeometryEncoding, LasExtraBytes}; // Arrow schema for LAS points pub fn try_schema_from_header( diff --git a/rust/sedona-pointcloud/src/las/source.rs b/rust/sedona-pointcloud/src/las/source.rs index 004d726cd..6f66b8237 100644 --- a/rust/sedona-pointcloud/src/las/source.rs +++ b/rust/sedona-pointcloud/src/las/source.rs @@ -15,22 +15,22 @@ // specific language governing permissions and limitations // under the License. -use std::{any::Any, sync::Arc}; +use std::{any::Any, iter, sync::Arc}; use datafusion_common::{config::ConfigOptions, error::DataFusionError, Statistics}; use datafusion_datasource::{ - file::FileSource, file_scan_config::FileScanConfig, file_stream::FileOpener, TableSchema, + file::FileSource, file_groups::FileGroupPartitioner, file_scan_config::FileScanConfig, + file_stream::FileOpener, source::DataSource, TableSchema, }; -use datafusion_physical_expr::{conjunction, PhysicalExpr}; +use datafusion_physical_expr::{conjunction, LexOrdering, PhysicalExpr}; use datafusion_physical_plan::{ filter_pushdown::{FilterPushdownPropagation, PushedDown}, metrics::ExecutionPlanMetricsSet, }; use object_store::ObjectStore; -use crate::{ - las::{format::Extension, opener::LasOpener, reader::LasFileReaderFactory}, - options::PointcloudOptions, +use crate::las::{ + format::Extension, opener::LasOpener, options::LasOptions, reader::LasFileReaderFactory, }; #[derive(Clone, Debug)] @@ -46,7 +46,7 @@ pub struct LasSource { /// Batch size configuration pub(crate) batch_size: Option, pub(crate) projected_statistics: Option, - pub(crate) options: PointcloudOptions, + pub(crate) options: LasOptions, pub(crate) extension: Extension, } @@ -64,7 +64,7 @@ impl LasSource { } } - pub fn with_options(mut self, options: PointcloudOptions) -> Self { + pub fn with_options(mut self, options: LasOptions) -> Self { self.options = options; self } @@ -80,7 +80,7 @@ impl FileSource for LasSource { &self, object_store: Arc, base_config: &FileScanConfig, - _partition: usize, + partition: usize, ) -> Arc { let projection = base_config .file_column_projection_indices() @@ -98,6 +98,8 @@ impl FileSource for LasSource { predicate: self.predicate.clone(), file_reader_factory, options: self.options.clone(), + partition_count: base_config.output_partitioning().partition_count(), + partition, }) } @@ -149,6 +151,52 @@ impl FileSource for LasSource { self.extension.as_str() } + fn repartitioned( + &self, + target_partitions: usize, + repartition_file_min_size: usize, + output_ordering: Option, + config: &FileScanConfig, + ) -> Result, DataFusionError> { + if output_ordering.is_none() & self.options.round_robin_partitioning { + // Custom round robin repartitioning + // + // The default way to partition a dataset to enable parallel reading + // by DataFusion is through splitting files by byte ranges into the + // number of target partitions. For selective queries on (partially) + // ordered datasets that support pruning, this can result in unequal + // resource use, as all the work is done on one partition while the + // rest is pruned. Additionally, this breaks the existing locality + // in the input when it is converted, as data from all partitions + // ends up in each output row group. This approach addresses these + // issues by partitioning the dataset using a round-robin scheme + // across sequential chunks. This improves selective query performance + // by more than half. + let mut config = config.clone(); + config.file_groups = config + .file_groups + .into_iter() + .flat_map(|fg| iter::repeat_n(fg, target_partitions)) + .collect(); + return Ok(Some(config)); + } else { + // Default byte range repartitioning + let repartitioned_file_groups_option = FileGroupPartitioner::new() + .with_target_partitions(target_partitions) + .with_repartition_file_min_size(repartition_file_min_size) + .with_preserve_order_within_groups(output_ordering.is_some()) + .repartition_file_groups(&config.file_groups); + + if let Some(repartitioned_file_groups) = repartitioned_file_groups_option { + let mut source = config.clone(); + source.file_groups = repartitioned_file_groups; + return Ok(Some(source)); + } + } + + Ok(None) + } + fn try_pushdown_filters( &self, filters: Vec>, diff --git a/rust/sedona-pointcloud/src/las/statistics.rs b/rust/sedona-pointcloud/src/las/statistics.rs index 87f11abd6..36b2d4bfc 100644 --- a/rust/sedona-pointcloud/src/las/statistics.rs +++ b/rust/sedona-pointcloud/src/las/statistics.rs @@ -15,7 +15,11 @@ // specific language governing permissions and limitations // under the License. -use std::{collections::HashSet, io::Cursor, sync::Arc}; +use std::{ + collections::HashSet, + io::{Cursor, Read, Seek}, + sync::Arc, +}; use arrow_array::{ builder::PrimitiveBuilder, @@ -25,22 +29,20 @@ use arrow_array::{ }; use arrow_ipc::{reader::FileReader, writer::FileWriter}; use arrow_schema::{DataType, Field, Schema}; +use byteorder::{LittleEndian, ReadBytesExt}; use datafusion_common::{arrow::compute::concat_batches, Column, DataFusionError, ScalarValue}; use datafusion_pruning::PruningStatistics; -use las::{Header, Point}; +use las::Header; use object_store::{path::Path, ObjectMeta, ObjectStore, PutPayload}; - +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use sedona_geometry::bounding_box::BoundingBox; -use crate::las::{ - metadata::ChunkMeta, - reader::{read_point, record_decompressor}, -}; +use crate::las::{metadata::ChunkMeta, reader::record_decompressor}; /// Spatial statistics (extent) of LAS/LAZ chunks for pruning. /// /// It wraps a `RecordBatch` with x, y, z min and max values and row count per chunk. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub struct LasStatistics { pub values: RecordBatch, } @@ -208,6 +210,7 @@ pub async fn chunk_statistics( chunk_table: &[ChunkMeta], header: &Header, persist: bool, + parallel: bool, ) -> Result { let stats_path = Path::parse(format!("{}.stats", object_meta.location.as_ref()))?; @@ -234,9 +237,31 @@ pub async fn chunk_statistics( // extract statistics let mut builder = LasStatisticsBuilder::new_with_capacity(chunk_table.len()); - for chunk_meta in chunk_table { - let stats = extract_chunk_stats(store, object_meta, chunk_meta, header).await?; - builder.add_values(&stats, chunk_meta.num_points); + if parallel { + // While the method to infer the schema, adopted from the Parquet + // reader, uses concurrency (metadata fetch concurrency), it is not + // parallel. Extracting statistics in parallel can substantially improve + // the extraction process by a factor of the number of cores available. + let stats: Vec<[f64; 6]> = chunk_table + .par_iter() + .map(|chunk_meta| { + futures::executor::block_on(extract_chunk_stats( + store, + object_meta, + chunk_meta, + header, + )) + }) + .collect::, DataFusionError>>()?; + + for (stat, meta) in stats.iter().zip(chunk_table) { + builder.add_values(stat, meta.num_points); + } + } else { + for chunk_meta in chunk_table { + let stats = extract_chunk_stats(store, object_meta, chunk_meta, header).await?; + builder.add_values(&stats, chunk_meta.num_points); + } } let stats = builder.finish(); @@ -274,14 +299,14 @@ async fn extract_chunk_stats( f64::NEG_INFINITY, ]; - let extend = |stats: &mut [f64; 6], point: Point| { + let extend = |stats: &mut [f64; 6], point: [f64; 3]| { *stats = [ - stats[0].min(point.x), - stats[1].max(point.x), - stats[2].min(point.y), - stats[3].max(point.y), - stats[4].min(point.z), - stats[5].max(point.z), + stats[0].min(point[0]), + stats[1].max(point[0]), + stats[2].min(point[1]), + stats[3].max(point[1]), + stats[4].min(point[2]), + stats[5].max(point[2]), ]; }; @@ -301,14 +326,16 @@ async fn extract_chunk_stats( for _ in 0..chunk_meta.num_points { buffer.set_position(0); decompressor.decompress_next(buffer.get_mut())?; - let point = read_point(&mut buffer, header)?; + let point = parse_coords(&mut buffer, header)?; extend(&mut stats, point); } } else { let mut buffer = Cursor::new(bytes); - + // offset to next point after reading raw coords + let offset = header.point_format().len() as i64 - 3 * 4; for _ in 0..chunk_meta.num_points { - let point = read_point(&mut buffer, header)?; + let point = parse_coords(&mut buffer, header)?; + buffer.seek_relative(offset)?; extend(&mut stats, point); } } @@ -316,6 +343,14 @@ async fn extract_chunk_stats( Ok(stats) } +fn parse_coords(mut buffer: R, header: &Header) -> Result<[f64; 3], DataFusionError> { + let transforms = header.transforms(); + let x = transforms.x.direct(buffer.read_i32::()?); + let y = transforms.y.direct(buffer.read_i32::()?); + let z = transforms.z.direct(buffer.read_i32::()?); + Ok([x, y, z]) +} + #[cfg(test)] mod tests { use std::fs::File; @@ -327,10 +362,12 @@ mod tests { use object_store::{local::LocalFileSystem, path::Path, ObjectStore}; use sedona_geometry::bounding_box::BoundingBox; - use crate::{las::metadata::LasMetadataReader, options::PointcloudOptions}; + use crate::las::{ + metadata::LasMetadataReader, options::LasOptions, statistics::chunk_statistics, + }; #[tokio::test] - async fn chunk_statistics() { + async fn check_chunk_statistics() { for path in ["tests/data/large.las", "tests/data/large.laz"] { // read with `LasMetadataReader` let store = LocalFileSystem::new(); @@ -341,7 +378,7 @@ mod tests { let metadata = metadata_reader.fetch_metadata().await.unwrap(); assert!(metadata.statistics.is_none()); - let options = PointcloudOptions { + let options = LasOptions { collect_statistics: true, ..Default::default() }; @@ -376,6 +413,18 @@ mod tests { None )) ); + + let par_stats = chunk_statistics( + &store, + &object_meta, + &metadata.chunk_table, + &metadata.header, + false, + true, + ) + .await + .unwrap(); + assert_eq!(statistics, &par_stats); } } @@ -406,7 +455,7 @@ mod tests { let location = Path::from_filesystem_path(&tmp_path).unwrap(); let object_meta = store.head(&location).await.unwrap(); - let options = PointcloudOptions { + let options = LasOptions { collect_statistics: true, persist_statistics: true, ..Default::default() diff --git a/rust/sedona-pointcloud/src/lib.rs b/rust/sedona-pointcloud/src/lib.rs index 7a75e0410..05df4dffc 100644 --- a/rust/sedona-pointcloud/src/lib.rs +++ b/rust/sedona-pointcloud/src/lib.rs @@ -16,4 +16,3 @@ // under the License. pub mod las; -pub mod options; diff --git a/rust/sedona-pointcloud/src/options.rs b/rust/sedona-pointcloud/src/options.rs deleted file mode 100644 index 51e5067bb..000000000 --- a/rust/sedona-pointcloud/src/options.rs +++ /dev/null @@ -1,103 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use std::{fmt::Display, str::FromStr}; - -use datafusion_common::{ - config::{ConfigExtension, ConfigField, Visit}, - error::DataFusionError, - extensions_options, -}; - -use crate::las::options::{LasExtraBytes, LasOptions}; - -/// Geometry representation -#[derive(Clone, Copy, Default, PartialEq, Eq, Debug)] -pub enum GeometryEncoding { - /// Use plain coordinates as three fields `x`, `y`, `z` with datatype Float64 encoding. - #[default] - Plain, - /// Resolves the coordinates to a fields `geometry` with WKB encoding. - Wkb, - /// Resolves the coordinates to a fields `geometry` with separated GeoArrow encoding. - Native, -} - -impl Display for GeometryEncoding { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - GeometryEncoding::Plain => f.write_str("plain"), - GeometryEncoding::Wkb => f.write_str("wkb"), - GeometryEncoding::Native => f.write_str("native"), - } - } -} - -impl FromStr for GeometryEncoding { - type Err = String; - - fn from_str(s: &str) -> Result { - match s.to_lowercase().as_str() { - "plain" => Ok(Self::Plain), - "wkb" => Ok(Self::Wkb), - "native" => Ok(Self::Native), - s => Err(format!("Unable to parse from `{s}`")), - } - } -} - -impl ConfigField for GeometryEncoding { - fn visit(&self, v: &mut V, key: &str, _description: &'static str) { - v.some( - &format!("{key}.geometry_encoding"), - self, - "Specify point geometry encoding", - ); - } - - fn set(&mut self, _key: &str, value: &str) -> Result<(), DataFusionError> { - *self = value.parse().map_err(DataFusionError::Configuration)?; - Ok(()) - } -} - -extensions_options! { - /// Pointcloud configuration options - pub struct PointcloudOptions { - pub geometry_encoding: GeometryEncoding, default = GeometryEncoding::default() - pub collect_statistics: bool, default = false - pub persist_statistics: bool, default = false - pub las: LasOptions, default = LasOptions::default() - } - -} - -impl ConfigExtension for PointcloudOptions { - const PREFIX: &'static str = "pointcloud"; -} - -impl PointcloudOptions { - pub fn with_geometry_encoding(mut self, geometry_encoding: GeometryEncoding) -> Self { - self.geometry_encoding = geometry_encoding; - self - } - - pub fn with_las_extra_bytes(mut self, extra_bytes: LasExtraBytes) -> Self { - self.las.extra_bytes = extra_bytes; - self - } -} diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs index 0de882617..5f58ecf16 100644 --- a/rust/sedona/src/context.rs +++ b/rust/sedona/src/context.rs @@ -57,12 +57,9 @@ use sedona_geoparquet::{ provider::{geoparquet_listing_table, GeoParquetReadOptions}, }; #[cfg(feature = "pointcloud")] -use sedona_pointcloud::{ - las::{ - format::{Extension, LasFormatFactory}, - options::LasExtraBytes, - }, - options::{GeometryEncoding, PointcloudOptions}, +use sedona_pointcloud::las::{ + format::{Extension, LasFormatFactory}, + options::{GeometryEncoding, LasExtraBytes, LasOptions}, }; /// Sedona SessionContext wrapper @@ -106,7 +103,7 @@ impl SedonaContext { let session_config = add_sedona_option_extension(session_config); #[cfg(feature = "pointcloud")] let session_config = session_config.with_option_extension( - PointcloudOptions::default() + LasOptions::default() .with_geometry_encoding(GeometryEncoding::Wkb) .with_las_extra_bytes(LasExtraBytes::Typed), );