Skip to content

Commit 2ae2e26

Browse files
committed
Optional parallel statistics extraction
1 parent de0bcbc commit 2ae2e26

6 files changed

Lines changed: 57 additions & 22 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/sedona-pointcloud/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ arrow-ipc = { workspace = true }
3636
arrow-schema = { workspace = true }
3737
async-stream = "0.3.6"
3838
async-trait = { workspace = true }
39+
byteorder = { workspace = true }
3940
bytes = { workspace = true }
4041
datafusion-catalog = { workspace = true }
4142
datafusion-common = { workspace = true }
@@ -51,6 +52,7 @@ las = { version = "0.9.10", features = ["laz"] }
5152
las-crs = { version = "1.0.0" }
5253
laz = "0.12.0"
5354
object_store = { workspace = true }
55+
rayon = "1.11.0"
5456
sedona-expr = { workspace = true }
5557
sedona-geometry = { workspace = true }
5658

rust/sedona-pointcloud/src/las/metadata.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,7 @@ impl<'a> LasMetadataReader<'a> {
153153
&chunk_table,
154154
&header,
155155
options.persist_statistics,
156+
options.parallel_statistics_extraction,
156157
)
157158
.await?,
158159
)
@@ -366,9 +367,9 @@ pub async fn fetch_chunk_table(
366367
header: &Header,
367368
) -> Result<Vec<ChunkMeta>, Box<dyn Error + Send + Sync>> {
368369
if header.laz_vlr().is_ok() {
369-
laz_chunk_table(store, object_meta, &header).await
370+
laz_chunk_table(store, object_meta, header).await
370371
} else {
371-
las_chunk_table(&header).await
372+
las_chunk_table(header).await
372373
}
373374
}
374375

rust/sedona-pointcloud/src/las/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ pub fn record_decompressor(
187187
Ok(decompressor)
188188
}
189189

190-
pub(crate) fn read_point<R: Read>(buffer: R, header: &Header) -> Result<Point, DataFusionError> {
190+
fn read_point<R: Read>(buffer: R, header: &Header) -> Result<Point, DataFusionError> {
191191
RawPoint::read_from(buffer, header.point_format())
192192
.map(|raw_point| Point::new(raw_point, header.transforms()))
193193
.map_err(|e| DataFusionError::External(Box::new(e)))

rust/sedona-pointcloud/src/las/statistics.rs

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,11 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::{collections::HashSet, io::Cursor, sync::Arc};
18+
use std::{
19+
collections::HashSet,
20+
io::{Cursor, Read},
21+
sync::Arc,
22+
};
1923

2024
use arrow_array::{
2125
builder::PrimitiveBuilder,
@@ -25,17 +29,15 @@ use arrow_array::{
2529
};
2630
use arrow_ipc::{reader::FileReader, writer::FileWriter};
2731
use arrow_schema::{DataType, Field, Schema};
32+
use byteorder::{LittleEndian, ReadBytesExt};
2833
use datafusion_common::{arrow::compute::concat_batches, Column, DataFusionError, ScalarValue};
2934
use datafusion_pruning::PruningStatistics;
30-
use las::{Header, Point};
35+
use las::Header;
3136
use object_store::{path::Path, ObjectMeta, ObjectStore, PutPayload};
32-
37+
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
3338
use sedona_geometry::bounding_box::BoundingBox;
3439

35-
use crate::las::{
36-
metadata::ChunkMeta,
37-
reader::{read_point, record_decompressor},
38-
};
40+
use crate::las::{metadata::ChunkMeta, reader::record_decompressor};
3941

4042
/// Spatial statistics (extent) of LAS/LAZ chunks for pruning.
4143
///
@@ -208,6 +210,7 @@ pub async fn chunk_statistics(
208210
chunk_table: &[ChunkMeta],
209211
header: &Header,
210212
persist: bool,
213+
parallel: bool,
211214
) -> Result<LasStatistics, DataFusionError> {
212215
let stats_path = Path::parse(format!("{}.stats", object_meta.location.as_ref()))?;
213216

@@ -234,9 +237,27 @@ pub async fn chunk_statistics(
234237
// extract statistics
235238
let mut builder = LasStatisticsBuilder::new_with_capacity(chunk_table.len());
236239

237-
for chunk_meta in chunk_table {
238-
let stats = extract_chunk_stats(store, object_meta, chunk_meta, header).await?;
239-
builder.add_values(&stats, chunk_meta.num_points);
240+
if parallel {
241+
let stats: Vec<[f64; 6]> = chunk_table
242+
.par_iter()
243+
.map(|chunk_meta| {
244+
futures::executor::block_on(extract_chunk_stats(
245+
store,
246+
object_meta,
247+
chunk_meta,
248+
header,
249+
))
250+
})
251+
.collect::<Result<Vec<[f64; 6]>, DataFusionError>>()?;
252+
253+
for (stat, meta) in stats.iter().zip(chunk_table) {
254+
builder.add_values(stat, meta.num_points);
255+
}
256+
} else {
257+
for chunk_meta in chunk_table {
258+
let stats = extract_chunk_stats(store, object_meta, chunk_meta, header).await?;
259+
builder.add_values(&stats, chunk_meta.num_points);
260+
}
240261
}
241262

242263
let stats = builder.finish();
@@ -274,14 +295,14 @@ async fn extract_chunk_stats(
274295
f64::NEG_INFINITY,
275296
];
276297

277-
let extend = |stats: &mut [f64; 6], point: Point| {
298+
let extend = |stats: &mut [f64; 6], point: [f64; 3]| {
278299
*stats = [
279-
stats[0].min(point.x),
280-
stats[1].max(point.x),
281-
stats[2].min(point.y),
282-
stats[3].max(point.y),
283-
stats[4].min(point.z),
284-
stats[5].max(point.z),
300+
stats[0].min(point[0]),
301+
stats[1].max(point[0]),
302+
stats[2].min(point[1]),
303+
stats[3].max(point[1]),
304+
stats[4].min(point[2]),
305+
stats[5].max(point[2]),
285306
];
286307
};
287308

@@ -301,21 +322,29 @@ async fn extract_chunk_stats(
301322
for _ in 0..chunk_meta.num_points {
302323
buffer.set_position(0);
303324
decompressor.decompress_next(buffer.get_mut())?;
304-
let point = read_point(&mut buffer, header)?;
325+
let point = parse_coords(&mut buffer, header)?;
305326
extend(&mut stats, point);
306327
}
307328
} else {
308329
let mut buffer = Cursor::new(bytes);
309330

310331
for _ in 0..chunk_meta.num_points {
311-
let point = read_point(&mut buffer, header)?;
332+
let point = parse_coords(&mut buffer, header)?;
312333
extend(&mut stats, point);
313334
}
314335
}
315336

316337
Ok(stats)
317338
}
318339

340+
fn parse_coords<R: Read>(mut buffer: R, header: &Header) -> Result<[f64; 3], DataFusionError> {
341+
let transforms = header.transforms();
342+
let x = transforms.x.direct(buffer.read_i32::<LittleEndian>()?);
343+
let y = transforms.y.direct(buffer.read_i32::<LittleEndian>()?);
344+
let z = transforms.z.direct(buffer.read_i32::<LittleEndian>()?);
345+
Ok([x, y, z])
346+
}
347+
319348
#[cfg(test)]
320349
mod tests {
321350
use std::fs::File;

rust/sedona-pointcloud/src/options.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ extensions_options! {
8080
pub struct PointcloudOptions {
8181
pub geometry_encoding: GeometryEncoding, default = GeometryEncoding::default()
8282
pub collect_statistics: bool, default = false
83+
pub parallel_statistics_extraction: bool, default = false
8384
pub persist_statistics: bool, default = false
8485
pub round_robin_partitioning: bool, default = false
8586
pub las: LasOptions, default = LasOptions::default()

0 commit comments

Comments
 (0)