Skip to content

Commit 78e9af0

Browse files
add partitioning on file reads and parallelize aggregations if partitioning permits
1 parent b1af8fe commit 78e9af0

File tree

7 files changed

+551
-52
lines changed

7 files changed

+551
-52
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ use datafusion_expr::{
8989
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
9090
use datafusion_physical_expr::expressions::Literal;
9191
use datafusion_physical_expr::{
92-
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
92+
create_physical_sort_exprs, Distribution, LexOrdering, PhysicalSortExpr,
9393
};
9494
use datafusion_physical_optimizer::PhysicalOptimizerRule;
9595
use datafusion_physical_plan::empty::EmptyExec;
@@ -809,12 +809,19 @@ impl DefaultPhysicalPlanner {
809809
// `AggregateFunctionExpr`/`PhysicalSortExpr` objects.
810810
let updated_aggregates = initial_aggr.aggr_expr().to_vec();
811811

812-
let next_partition_mode = if can_repartition {
813-
// construct a second aggregation with 'AggregateMode::FinalPartitioned'
812+
// Check if input is already hash-partitioned on a subset of the group-by
813+
// columns.
814+
let input_already_partitioned = !groups.is_empty()
815+
&& initial_aggr.input().output_partitioning().partition_count() > 1
816+
&& initial_aggr.input().output_partitioning().satisfy(
817+
&Distribution::HashPartitioned(groups.input_exprs()),
818+
&initial_aggr.input().equivalence_properties(),
819+
);
820+
821+
let next_partition_mode = if can_repartition || input_already_partitioned
822+
{
814823
AggregateMode::FinalPartitioned
815824
} else {
816-
// construct a second aggregation, keeping the final column name equal to the
817-
// first aggregation and the expressions corresponding to the respective aggregate
818825
AggregateMode::Final
819826
};
820827

datafusion/datasource/src/file_scan_config.rs

Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,30 @@ impl DataSource for FileScanConfig {
570570
}
571571

572572
fn output_partitioning(&self) -> Partitioning {
573+
if self.is_properly_partitioned() {
574+
// Use the projected schema to find the column index, not the virtual
575+
// partition column position.
576+
let partition_exprs: Vec<Arc<dyn PhysicalExpr>> = self
577+
.table_partition_cols()
578+
.iter()
579+
.filter_map(|field| {
580+
self.projected_schema()
581+
.fields()
582+
.iter()
583+
.position(|f| f.name() == field.name())
584+
.map(|idx| {
585+
Arc::new(Column::new(field.name(), idx))
586+
as Arc<dyn PhysicalExpr>
587+
})
588+
})
589+
.collect();
590+
591+
if partition_exprs.len() == self.table_partition_cols().len() {
592+
return Partitioning::Hash(partition_exprs, self.file_groups.len());
593+
}
594+
}
595+
596+
// Fall back to UnknownPartitioning
573597
Partitioning::UnknownPartitioning(self.file_groups.len())
574598
}
575599

@@ -736,6 +760,25 @@ impl FileScanConfig {
736760
}
737761
}
738762

763+
fn is_properly_partitioned(&self) -> bool {
764+
if self.file_groups.is_empty() || self.table_partition_cols().is_empty() {
765+
return false;
766+
}
767+
768+
self.file_groups.iter().all(|file_group| {
769+
let files = file_group.files();
770+
if files.is_empty() {
771+
return true;
772+
}
773+
774+
let reference_values = &files[0].partition_values;
775+
776+
files
777+
.iter()
778+
.all(|file| file.partition_values == *reference_values)
779+
})
780+
}
781+
739782
pub fn projected_stats(&self) -> Statistics {
740783
let statistics = self.file_source.statistics().unwrap();
741784

@@ -2673,4 +2716,172 @@ mod tests {
26732716
assert_eq!(partition_stats.num_rows, Precision::Exact(100));
26742717
assert_eq!(partition_stats.total_byte_size, Precision::Exact(1024));
26752718
}
2719+
2720+
#[test]
2721+
fn test_is_properly_partitioned() {
2722+
// All file groups have consistent partition values
2723+
let file_schema = Arc::new(Schema::new(vec![Field::new(
2724+
"value",
2725+
DataType::Int32,
2726+
false,
2727+
)]));
2728+
2729+
let partition_cols = vec![Field::new(
2730+
"f_dkey",
2731+
wrap_partition_type_in_dict(DataType::Utf8),
2732+
false,
2733+
)];
2734+
2735+
// Case 1: Multiple file groups, each with consistent values
2736+
let file_groups = vec![
2737+
FileGroup::new(vec![
2738+
{
2739+
let mut file =
2740+
PartitionedFile::new("data/f_dkey=A/file1.parquet", 1024);
2741+
file.partition_values =
2742+
vec![wrap_partition_value_in_dict(ScalarValue::from("A"))];
2743+
file
2744+
},
2745+
{
2746+
let mut file =
2747+
PartitionedFile::new("data/f_dkey=A/file2.parquet", 2048);
2748+
file.partition_values =
2749+
vec![wrap_partition_value_in_dict(ScalarValue::from("A"))];
2750+
file
2751+
},
2752+
]),
2753+
FileGroup::new(vec![{
2754+
let mut file = PartitionedFile::new("data/f_dkey=B/file1.parquet", 512);
2755+
file.partition_values =
2756+
vec![wrap_partition_value_in_dict(ScalarValue::from("B"))];
2757+
file
2758+
}]),
2759+
FileGroup::new(vec![{
2760+
let mut file = PartitionedFile::new("data/f_dkey=C/file1.parquet", 1024);
2761+
file.partition_values =
2762+
vec![wrap_partition_value_in_dict(ScalarValue::from("C"))];
2763+
file
2764+
}]),
2765+
];
2766+
2767+
let config = FileScanConfigBuilder::new(
2768+
ObjectStoreUrl::parse("test:///").unwrap(),
2769+
Arc::new(MockSource::new(TableSchema::new(
2770+
Arc::clone(&file_schema),
2771+
partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2772+
))),
2773+
)
2774+
.with_file_groups(file_groups)
2775+
.build();
2776+
2777+
assert!(
2778+
config.is_properly_partitioned(),
2779+
"Expected true when all file groups have consistent partition values"
2780+
);
2781+
2782+
// Case 2: Mixed partition values within a group (should return false)
2783+
let mixed_file_groups = vec![FileGroup::new(vec![
2784+
{
2785+
let mut file = PartitionedFile::new("data/f_dkey=A/file1.parquet", 1024);
2786+
file.partition_values =
2787+
vec![wrap_partition_value_in_dict(ScalarValue::from("A"))];
2788+
file
2789+
},
2790+
{
2791+
let mut file = PartitionedFile::new("data/f_dkey=B/file2.parquet", 2048);
2792+
file.partition_values =
2793+
vec![wrap_partition_value_in_dict(ScalarValue::from("B"))];
2794+
file
2795+
},
2796+
])];
2797+
2798+
let config_mixed = FileScanConfigBuilder::new(
2799+
ObjectStoreUrl::parse("test:///").unwrap(),
2800+
Arc::new(MockSource::new(TableSchema::new(
2801+
Arc::clone(&file_schema),
2802+
partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2803+
))),
2804+
)
2805+
.with_file_groups(mixed_file_groups)
2806+
.build();
2807+
2808+
assert!(
2809+
!config_mixed.is_properly_partitioned(),
2810+
"Expected false when file group has mixed partition values"
2811+
);
2812+
2813+
// Case 3: Empty file groups
2814+
let config_empty = FileScanConfigBuilder::new(
2815+
ObjectStoreUrl::parse("test:///").unwrap(),
2816+
Arc::new(MockSource::new(TableSchema::new(
2817+
Arc::clone(&file_schema),
2818+
partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2819+
))),
2820+
)
2821+
.with_file_groups(vec![])
2822+
.build();
2823+
2824+
assert!(
2825+
!config_empty.is_properly_partitioned(),
2826+
"Expected false for empty file groups"
2827+
);
2828+
}
2829+
2830+
#[test]
2831+
fn test_output_partitioning_column_indices() {
2832+
// Test that output_partitioning returns correct column indices in projected schema
2833+
let file_schema = Arc::new(Schema::new(vec![
2834+
Field::new("value", DataType::Int32, false),
2835+
Field::new("timestamp", DataType::Int64, false),
2836+
]));
2837+
2838+
let partition_cols = vec![Field::new(
2839+
"f_dkey",
2840+
wrap_partition_type_in_dict(DataType::Utf8),
2841+
false,
2842+
)];
2843+
2844+
let file_groups = vec![FileGroup::new(vec![{
2845+
let mut file = PartitionedFile::new("data/f_dkey=A/file1.parquet", 1024);
2846+
file.partition_values =
2847+
vec![wrap_partition_value_in_dict(ScalarValue::from("A"))];
2848+
file
2849+
}])];
2850+
2851+
// Full schema is [value@0, timestamp@1, f_dkey@2]
2852+
// Project indices [0, 2] -> [value@0, f_dkey@1] in projected schema
2853+
let config = FileScanConfigBuilder::new(
2854+
ObjectStoreUrl::parse("test:///").unwrap(),
2855+
Arc::new(MockSource::new(TableSchema::new(
2856+
Arc::clone(&file_schema),
2857+
partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2858+
))),
2859+
)
2860+
.with_projection_indices(Some(vec![0, 2]))
2861+
.with_file_groups(file_groups)
2862+
.build();
2863+
2864+
let partitioning = config.output_partitioning();
2865+
2866+
match partitioning {
2867+
Partitioning::Hash(exprs, count) => {
2868+
assert_eq!(count, 1, "Expected 1 partition (single file group)");
2869+
assert_eq!(exprs.len(), 1, "Expected 1 partition expression");
2870+
2871+
// The partition expression should reference f_dkey at index 1
2872+
// in the projected schema [value@0, f_dkey@1]
2873+
let col = exprs[0]
2874+
.as_any()
2875+
.downcast_ref::<Column>()
2876+
.expect("Expected Column expression");
2877+
assert_eq!(col.name(), "f_dkey", "Column name should be f_dkey");
2878+
assert_eq!(
2879+
col.index(),
2880+
1,
2881+
"Column index should be 1 in projected schema [value@0, f_dkey@1]"
2882+
);
2883+
}
2884+
_ => panic!("Expected Hash partitioning, got: {:?}", partitioning),
2885+
}
2886+
}
26762887
}

0 commit comments

Comments
 (0)