Skip to content

Commit be508b7

Browse files
add partitioning on file reads and parallelize aggregations if partitioning permits
1 parent b1af8fe commit be508b7

File tree

11 files changed

+618
-19
lines changed

11 files changed

+618
-19
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ use datafusion_expr::{
8989
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
9090
use datafusion_physical_expr::expressions::Literal;
9191
use datafusion_physical_expr::{
92-
create_physical_sort_exprs, LexOrdering, PhysicalSortExpr,
92+
create_physical_sort_exprs, Distribution, LexOrdering, PhysicalSortExpr,
9393
};
9494
use datafusion_physical_optimizer::PhysicalOptimizerRule;
9595
use datafusion_physical_plan::empty::EmptyExec;
@@ -809,12 +809,19 @@ impl DefaultPhysicalPlanner {
809809
// `AggregateFunctionExpr`/`PhysicalSortExpr` objects.
810810
let updated_aggregates = initial_aggr.aggr_expr().to_vec();
811811

812-
let next_partition_mode = if can_repartition {
813-
// construct a second aggregation with 'AggregateMode::FinalPartitioned'
812+
// Check if input is already hash-partitioned on a subset of the group-by
813+
// columns.
814+
let input_already_partitioned = !groups.is_empty()
815+
&& initial_aggr.input().output_partitioning().partition_count() > 1
816+
&& initial_aggr.input().output_partitioning().satisfy(
817+
&Distribution::HashPartitioned(groups.input_exprs()),
818+
&initial_aggr.input().equivalence_properties(),
819+
);
820+
821+
let next_partition_mode = if can_repartition || input_already_partitioned
822+
{
814823
AggregateMode::FinalPartitioned
815824
} else {
816-
// construct a second aggregation, keeping the final column name equal to the
817-
// first aggregation and the expressions corresponding to the respective aggregate
818825
AggregateMode::Final
819826
};
820827

datafusion/datasource/src/file_scan_config.rs

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -570,6 +570,35 @@ impl DataSource for FileScanConfig {
570570
}
571571

572572
fn output_partitioning(&self) -> Partitioning {
573+
if self.is_properly_partitioned() {
574+
// Use the projected schema to find the column index, not the virtual
575+
// partition column position.
576+
let partition_exprs: Vec<Arc<dyn PhysicalExpr>> = self
577+
.table_partition_cols()
578+
.iter()
579+
.filter_map(|field| {
580+
self.projected_schema()
581+
.fields()
582+
.iter()
583+
.position(|f| f.name() == field.name())
584+
.map(|idx| {
585+
Arc::new(Column::new(field.name(), idx))
586+
as Arc<dyn PhysicalExpr>
587+
})
588+
})
589+
.collect();
590+
591+
if partition_exprs.len() == self.table_partition_cols().len() {
592+
// Use SingleValuePartitioned for Hive-style partitioning where each file group
593+
// corresponds to exactly one distinct value of the partition columns.
594+
return Partitioning::SingleValuePartitioned(
595+
partition_exprs,
596+
self.file_groups.len(),
597+
);
598+
}
599+
}
600+
601+
// Fall back to UnknownPartitioning
573602
Partitioning::UnknownPartitioning(self.file_groups.len())
574603
}
575604

@@ -736,6 +765,25 @@ impl FileScanConfig {
736765
}
737766
}
738767

768+
fn is_properly_partitioned(&self) -> bool {
769+
if self.file_groups.is_empty() || self.table_partition_cols().is_empty() {
770+
return false;
771+
}
772+
773+
self.file_groups.iter().all(|file_group| {
774+
let files = file_group.files();
775+
if files.is_empty() {
776+
return true;
777+
}
778+
779+
let reference_values = &files[0].partition_values;
780+
781+
files
782+
.iter()
783+
.all(|file| file.partition_values == *reference_values)
784+
})
785+
}
786+
739787
pub fn projected_stats(&self) -> Statistics {
740788
let statistics = self.file_source.statistics().unwrap();
741789

@@ -2673,4 +2721,170 @@ mod tests {
26732721
assert_eq!(partition_stats.num_rows, Precision::Exact(100));
26742722
assert_eq!(partition_stats.total_byte_size, Precision::Exact(1024));
26752723
}
2724+
2725+
#[test]
2726+
fn test_is_properly_partitioned() {
2727+
// All file groups have consistent partition values
2728+
let file_schema = Arc::new(Schema::new(vec![Field::new(
2729+
"value",
2730+
DataType::Int32,
2731+
false,
2732+
)]));
2733+
2734+
let partition_cols = [Field::new(
2735+
"f_dkey",
2736+
wrap_partition_type_in_dict(DataType::Utf8),
2737+
false,
2738+
)];
2739+
2740+
// Case 1: Multiple file groups, each with consistent values
2741+
let file_groups = vec![
2742+
FileGroup::new(vec![
2743+
{
2744+
let mut file =
2745+
PartitionedFile::new("data/f_dkey=A/file1.parquet", 1024);
2746+
file.partition_values =
2747+
vec![wrap_partition_value_in_dict(ScalarValue::from("A"))];
2748+
file
2749+
},
2750+
{
2751+
let mut file =
2752+
PartitionedFile::new("data/f_dkey=A/file2.parquet", 2048);
2753+
file.partition_values =
2754+
vec![wrap_partition_value_in_dict(ScalarValue::from("A"))];
2755+
file
2756+
},
2757+
]),
2758+
FileGroup::new(vec![{
2759+
let mut file = PartitionedFile::new("data/f_dkey=B/file1.parquet", 512);
2760+
file.partition_values =
2761+
vec![wrap_partition_value_in_dict(ScalarValue::from("B"))];
2762+
file
2763+
}]),
2764+
FileGroup::new(vec![{
2765+
let mut file = PartitionedFile::new("data/f_dkey=C/file1.parquet", 1024);
2766+
file.partition_values =
2767+
vec![wrap_partition_value_in_dict(ScalarValue::from("C"))];
2768+
file
2769+
}]),
2770+
];
2771+
2772+
let config = FileScanConfigBuilder::new(
2773+
ObjectStoreUrl::parse("test:///").unwrap(),
2774+
Arc::new(MockSource::new(TableSchema::new(
2775+
Arc::clone(&file_schema),
2776+
partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2777+
))),
2778+
)
2779+
.with_file_groups(file_groups)
2780+
.build();
2781+
2782+
assert!(
2783+
config.is_properly_partitioned(),
2784+
"Expected true when all file groups have consistent partition values"
2785+
);
2786+
2787+
// Case 2: Mixed partition values within a group (should return false)
2788+
let mixed_file_groups = vec![FileGroup::new(vec![
2789+
{
2790+
let mut file = PartitionedFile::new("data/f_dkey=A/file1.parquet", 1024);
2791+
file.partition_values =
2792+
vec![wrap_partition_value_in_dict(ScalarValue::from("A"))];
2793+
file
2794+
},
2795+
{
2796+
let mut file = PartitionedFile::new("data/f_dkey=B/file2.parquet", 2048);
2797+
file.partition_values =
2798+
vec![wrap_partition_value_in_dict(ScalarValue::from("B"))];
2799+
file
2800+
},
2801+
])];
2802+
2803+
let config_mixed = FileScanConfigBuilder::new(
2804+
ObjectStoreUrl::parse("test:///").unwrap(),
2805+
Arc::new(MockSource::new(TableSchema::new(
2806+
Arc::clone(&file_schema),
2807+
partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2808+
))),
2809+
)
2810+
.with_file_groups(mixed_file_groups)
2811+
.build();
2812+
2813+
assert!(
2814+
!config_mixed.is_properly_partitioned(),
2815+
"Expected false when file group has mixed partition values"
2816+
);
2817+
2818+
// Case 3: Empty file groups
2819+
let config_empty = FileScanConfigBuilder::new(
2820+
ObjectStoreUrl::parse("test:///").unwrap(),
2821+
Arc::new(MockSource::new(TableSchema::new(
2822+
Arc::clone(&file_schema),
2823+
partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2824+
))),
2825+
)
2826+
.with_file_groups(vec![])
2827+
.build();
2828+
2829+
assert!(
2830+
!config_empty.is_properly_partitioned(),
2831+
"Expected false for empty file groups"
2832+
);
2833+
}
2834+
2835+
#[test]
2836+
fn test_output_partitioning_column_indices() {
2837+
// Test that output_partitioning returns correct column indices in projected schema
2838+
let file_schema = Arc::new(Schema::new(vec![
2839+
Field::new("value", DataType::Int32, false),
2840+
Field::new("timestamp", DataType::Int64, false),
2841+
]));
2842+
2843+
let partition_cols = [Field::new(
2844+
"f_dkey",
2845+
wrap_partition_type_in_dict(DataType::Utf8),
2846+
false,
2847+
)];
2848+
2849+
let file_groups = vec![FileGroup::new(vec![{
2850+
let mut file = PartitionedFile::new("data/f_dkey=A/file1.parquet", 1024);
2851+
file.partition_values =
2852+
vec![wrap_partition_value_in_dict(ScalarValue::from("A"))];
2853+
file
2854+
}])];
2855+
2856+
// Full schema is [value@0, timestamp@1, f_dkey@2]
2857+
// Project indices [0, 2] -> [value@0, f_dkey@1] in projected schema
2858+
let config = FileScanConfigBuilder::new(
2859+
ObjectStoreUrl::parse("test:///").unwrap(),
2860+
Arc::new(MockSource::new(TableSchema::new(
2861+
Arc::clone(&file_schema),
2862+
partition_cols.iter().map(|f| Arc::new(f.clone())).collect(),
2863+
))),
2864+
)
2865+
.with_projection_indices(Some(vec![0, 2]))
2866+
.with_file_groups(file_groups)
2867+
.build();
2868+
2869+
let partitioning = config.output_partitioning();
2870+
2871+
match partitioning {
2872+
Partitioning::SingleValuePartitioned(exprs, count) => {
2873+
assert_eq!(count, 1, "Expected 1 partition (single file group)");
2874+
assert_eq!(exprs.len(), 1, "Expected 1 partition expression");
2875+
2876+
let col = exprs[0]
2877+
.as_any()
2878+
.downcast_ref::<Column>()
2879+
.expect("Expected Column expression");
2880+
assert_eq!(col.name(), "f_dkey", "Column name should be f_dkey");
2881+
assert_eq!(
2882+
col.index(),
2883+
1,
2884+
"Column index should be 1 in projected schema [value@0, f_dkey@1]"
2885+
);
2886+
}
2887+
_ => panic!("Expected SingleValuePartitioned, got: {partitioning:?}"),
2888+
}
2889+
}
26762890
}

0 commit comments

Comments
 (0)