1717
1818use arrow:: array:: BooleanBufferBuilder ;
1919use arrow_schema:: SchemaRef ;
20- use datafusion_physical_plan:: metrics:: { self , ExecutionPlanMetricsSet , MetricBuilder } ;
2120use sedona_common:: SpatialJoinOptions ;
2221use sedona_expr:: statistics:: GeoStatistics ;
2322use std:: sync:: Arc ;
2423
25- use datafusion_common:: { utils:: proxy:: VecAllocExt , Result } ;
26- use datafusion_expr:: JoinType ;
27- use futures:: StreamExt ;
28- use geo_index:: rtree:: { sort:: HilbertSort , RTree , RTreeBuilder , RTreeIndex } ;
29- use parking_lot:: Mutex ;
30- use std:: sync:: atomic:: AtomicUsize ;
31-
3224use crate :: index:: spatial_index:: SpatialIndexRef ;
25+ use crate :: index:: spatial_index_builder:: { SpatialIndexBuilder , SpatialJoinBuildMetrics } ;
3326use crate :: {
3427 evaluated_batch:: { evaluated_batch_stream:: SendableEvaluatedBatchStream , EvaluatedBatch } ,
3528 index:: { default_spatial_index:: DefaultSpatialIndex , knn_adapter:: KnnComponents } ,
@@ -38,6 +31,13 @@ use crate::{
3831 spatial_predicate:: SpatialPredicate ,
3932 utils:: join_utils:: need_produce_result_in_final,
4033} ;
34+ use async_trait:: async_trait;
35+ use datafusion_common:: { utils:: proxy:: VecAllocExt , Result } ;
36+ use datafusion_expr:: JoinType ;
37+ use futures:: StreamExt ;
38+ use geo_index:: rtree:: { sort:: HilbertSort , RTree , RTreeBuilder , RTreeIndex } ;
39+ use parking_lot:: Mutex ;
40+ use std:: sync:: atomic:: AtomicUsize ;
4141
4242// Type aliases for better readability
4343type SpatialRTree = RTree < f32 > ;
@@ -54,7 +54,7 @@ const RTREE_MEMORY_ESTIMATE_PER_RECT: usize = 60;
5454/// 2. Building the spatial R-tree index
5555/// 3. Setting up memory tracking and visited bitmaps
5656/// 4. Configuring prepared geometries based on execution mode
57- pub struct SpatialIndexBuilder {
57+ pub struct DefaultSpatialIndexBuilder {
5858 schema : SchemaRef ,
5959 spatial_predicate : SpatialPredicate ,
6060 options : SpatialJoinOptions ,
@@ -72,25 +72,7 @@ pub struct SpatialIndexBuilder {
7272 memory_used : usize ,
7373}
7474
75- /// Metrics for the build phase of the spatial join.
76- #[ derive( Clone , Debug , Default ) ]
77- pub struct SpatialJoinBuildMetrics {
78- /// Total time for collecting build-side of join
79- pub ( crate ) build_time : metrics:: Time ,
80- /// Memory used by the spatial-index in bytes
81- pub ( crate ) build_mem_used : metrics:: Gauge ,
82- }
83-
84- impl SpatialJoinBuildMetrics {
85- pub fn new ( partition : usize , metrics : & ExecutionPlanMetricsSet ) -> Self {
86- Self {
87- build_time : MetricBuilder :: new ( metrics) . subset_time ( "build_time" , partition) ,
88- build_mem_used : MetricBuilder :: new ( metrics) . gauge ( "build_mem_used" , partition) ,
89- }
90- }
91- }
92-
93- impl SpatialIndexBuilder {
75+ impl DefaultSpatialIndexBuilder {
9476 /// Create a new builder with the given configuration.
9577 pub fn new (
9678 schema : SchemaRef ,
@@ -113,55 +95,6 @@ impl SpatialIndexBuilder {
11395 } )
11496 }
11597
116- /// Estimate the amount of memory required by the R-tree index and evaluating spatial predicates.
117- /// The estimated memory usage does not include the memory required for holding the build side
118- /// batches.
119- pub fn estimate_extra_memory_usage (
120- geo_stats : & GeoStatistics ,
121- spatial_predicate : & SpatialPredicate ,
122- options : & SpatialJoinOptions ,
123- ) -> usize {
124- // Estimate the amount of memory needed by the refiner
125- let num_geoms = geo_stats. total_geometries ( ) . unwrap_or ( 0 ) as usize ;
126- let refiner = create_refiner (
127- options. spatial_library ,
128- spatial_predicate,
129- options. clone ( ) ,
130- num_geoms,
131- geo_stats. clone ( ) ,
132- ) ;
133- let refiner_mem_usage = refiner. estimate_max_memory_usage ( geo_stats) ;
134-
135- let knn_components_mem_usage =
136- if matches ! ( spatial_predicate, SpatialPredicate :: KNearestNeighbors ( _) ) {
137- KnnComponents :: estimate_max_memory_usage ( geo_stats)
138- } else {
139- 0
140- } ;
141-
142- // Estimate the amount of memory needed for the R-tree
143- let rtree_mem_usage = num_geoms * RTREE_MEMORY_ESTIMATE_PER_RECT ;
144-
145- // The final estimation is the sum of all above
146- refiner_mem_usage + knn_components_mem_usage + rtree_mem_usage
147- }
148-
149- /// Add a geometry batch to be indexed.
150- ///
151- /// This method accumulates geometry batches that will be used to build the spatial index.
152- /// Each batch contains processed geometry data along with memory usage information.
153- pub fn add_batch ( & mut self , indexed_batch : EvaluatedBatch ) -> Result < ( ) > {
154- let in_mem_size = indexed_batch. in_mem_size ( ) ?;
155- self . indexed_batches . push ( indexed_batch) ;
156- self . record_memory_usage ( in_mem_size) ;
157- Ok ( ( ) )
158- }
159-
160- pub fn merge_stats ( & mut self , stats : GeoStatistics ) -> & mut Self {
161- self . stats . merge ( & stats) ;
162- self
163- }
164-
16598 /// Build the spatial R-tree index from collected geometry batches.
16699 fn build_rtree ( & mut self ) -> Result < RTreeBuildResult > {
167100 let build_timer = self . metrics . build_time . timer ( ) ;
@@ -244,8 +177,57 @@ impl SpatialIndexBuilder {
244177 geom_idx_vec
245178 }
246179
247- /// Finish building and return the completed SpatialIndex.
248- pub fn finish ( mut self ) -> Result < SpatialIndexRef > {
180+ fn record_memory_usage ( & mut self , bytes : usize ) {
181+ self . memory_used += bytes;
182+ self . metrics . build_mem_used . set_max ( self . memory_used ) ;
183+ }
184+ }
185+
186+ #[ async_trait]
187+ impl SpatialIndexBuilder for DefaultSpatialIndexBuilder {
188+ fn estimate_extra_memory_usage (
189+ geo_stats : & GeoStatistics ,
190+ spatial_predicate : & SpatialPredicate ,
191+ options : & SpatialJoinOptions ,
192+ ) -> usize {
193+ // Estimate the amount of memory needed by the refiner
194+ let num_geoms = geo_stats. total_geometries ( ) . unwrap_or ( 0 ) as usize ;
195+ let refiner = create_refiner (
196+ options. spatial_library ,
197+ spatial_predicate,
198+ options. clone ( ) ,
199+ num_geoms,
200+ geo_stats. clone ( ) ,
201+ ) ;
202+ let refiner_mem_usage = refiner. estimate_max_memory_usage ( geo_stats) ;
203+
204+ let knn_components_mem_usage =
205+ if matches ! ( spatial_predicate, SpatialPredicate :: KNearestNeighbors ( _) ) {
206+ KnnComponents :: estimate_max_memory_usage ( geo_stats)
207+ } else {
208+ 0
209+ } ;
210+
211+ // Estimate the amount of memory needed for the R-tree
212+ let rtree_mem_usage = num_geoms * RTREE_MEMORY_ESTIMATE_PER_RECT ;
213+
214+ // The final estimation is the sum of all above
215+ refiner_mem_usage + knn_components_mem_usage + rtree_mem_usage
216+ }
217+
218+ fn add_batch ( & mut self , indexed_batch : EvaluatedBatch ) -> Result < ( ) > {
219+ let in_mem_size = indexed_batch. in_mem_size ( ) ?;
220+ self . indexed_batches . push ( indexed_batch) ;
221+ self . record_memory_usage ( in_mem_size) ;
222+ Ok ( ( ) )
223+ }
224+
225+ fn merge_stats ( & mut self , stats : GeoStatistics ) -> & mut Self {
226+ self . stats . merge ( & stats) ;
227+ self
228+ }
229+
230+ fn finish ( mut self ) -> Result < SpatialIndexRef > {
249231 if self . indexed_batches . is_empty ( ) {
250232 return Ok ( Arc :: new ( DefaultSpatialIndex :: empty (
251233 self . spatial_predicate ,
@@ -309,7 +291,7 @@ impl SpatialIndexBuilder {
309291 ) ) )
310292 }
311293
312- pub async fn add_stream (
294+ async fn add_stream (
313295 & mut self ,
314296 mut stream : SendableEvaluatedBatchStream ,
315297 geo_statistics : GeoStatistics ,
@@ -321,9 +303,4 @@ impl SpatialIndexBuilder {
321303 self . merge_stats ( geo_statistics) ;
322304 Ok ( ( ) )
323305 }
324-
325- fn record_memory_usage ( & mut self , bytes : usize ) {
326- self . memory_used += bytes;
327- self . metrics . build_mem_used . set_max ( self . memory_used ) ;
328- }
329306}
0 commit comments