From 2857b02449eaa6e2008725253f889355445164dc Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 22 Sep 2025 16:39:08 -0400 Subject: [PATCH 1/2] refactor: Move `DistributedPlanError` to error folder --- src/distributed_physical_optimizer_rule.rs | 37 +-------------------- src/errors/distributed_plan_error.rs | 38 ++++++++++++++++++++++ src/errors/mod.rs | 1 + src/execution_plans/network_coalesce.rs | 3 +- src/execution_plans/partition_isolator.rs | 2 +- 5 files changed, 43 insertions(+), 38 deletions(-) create mode 100644 src/errors/distributed_plan_error.rs diff --git a/src/distributed_physical_optimizer_rule.rs b/src/distributed_physical_optimizer_rule.rs index 7fc0607..a6903d6 100644 --- a/src/distributed_physical_optimizer_rule.rs +++ b/src/distributed_physical_optimizer_rule.rs @@ -1,4 +1,5 @@ use super::{NetworkShuffleExec, PartitionIsolatorExec, StageExec}; +use crate::errors::distributed_plan_error::{get_distribute_plan_err, limit_tasks_err, DistributedPlanError}; use crate::execution_plans::NetworkCoalesceExec; use datafusion::common::plan_err; use datafusion::common::tree_node::TreeNodeRecursion; @@ -15,8 +16,6 @@ use datafusion::{ physical_optimizer::PhysicalOptimizerRule, physical_plan::{repartition::RepartitionExec, ExecutionPlan}, }; -use std::error::Error; -use std::fmt::{Display, Formatter}; use std::sync::Arc; use uuid::Uuid; @@ -321,40 +320,6 @@ pub trait NetworkBoundary: ExecutionPlan { } } -/// Error thrown during distributed planning that prompts the planner to change something and -/// try again. -#[derive(Debug)] -enum DistributedPlanError { - /// Prompts the planner to limit the amount of tasks used in the stage that is currently - /// being planned. - LimitTasks(usize), -} - -impl Display for DistributedPlanError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - DistributedPlanError::LimitTasks(n) => { - write!(f, "LimitTasksErr: {n}") - } - } - } -} - -impl Error for DistributedPlanError {} - -/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner -/// to try rebuilding the current stage with a limited amount of tasks. -pub fn limit_tasks_err(limit: usize) -> DataFusionError { - DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit))) -} - -fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> { - let DataFusionError::External(err) = err else { - return None; - }; - err.downcast_ref() -} - #[cfg(test)] mod tests { use crate::assert_snapshot; diff --git a/src/errors/distributed_plan_error.rs b/src/errors/distributed_plan_error.rs new file mode 100644 index 0000000..907c4ac --- /dev/null +++ b/src/errors/distributed_plan_error.rs @@ -0,0 +1,38 @@ +use std::fmt::{Display, Formatter}; +use std::error::Error; + +use datafusion::error::DataFusionError; + +/// Error thrown during distributed planning that prompts the planner to change something and +/// try again. +#[derive(Debug)] +pub enum DistributedPlanError { + /// Prompts the planner to limit the amount of tasks used in the stage that is currently + /// being planned. + LimitTasks(usize), +} + +impl Display for DistributedPlanError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + DistributedPlanError::LimitTasks(n) => { + write!(f, "LimitTasksErr: {n}") + } + } + } +} + +impl Error for DistributedPlanError {} + +/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner +/// to try rebuilding the current stage with a limited amount of tasks. +pub fn limit_tasks_err(limit: usize) -> DataFusionError { + DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit))) +} + +pub fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> { + let DataFusionError::External(err) = err else { + return None; + }; + err.downcast_ref() +} \ No newline at end of file diff --git a/src/errors/mod.rs b/src/errors/mod.rs index e2a76d6..a1692d0 100644 --- a/src/errors/mod.rs +++ b/src/errors/mod.rs @@ -8,6 +8,7 @@ use prost::Message; mod arrow_error; mod datafusion_error; +pub mod distributed_plan_error; mod io_error; mod objectstore_error; mod parquet_error; diff --git a/src/execution_plans/network_coalesce.rs b/src/execution_plans/network_coalesce.rs index f2e047b..f0e8e36 100644 --- a/src/execution_plans/network_coalesce.rs +++ b/src/execution_plans/network_coalesce.rs @@ -1,7 +1,8 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::common::scale_partitioning_props; use crate::config_extension_ext::ContextGrpcMetadata; -use crate::distributed_physical_optimizer_rule::{limit_tasks_err, NetworkBoundary}; +use crate::distributed_physical_optimizer_rule::NetworkBoundary; +use crate::errors::distributed_plan_error::limit_tasks_err; use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error}; use crate::execution_plans::{DistributedTaskContext, StageExec}; use crate::flight_service::DoGet; diff --git a/src/execution_plans/partition_isolator.rs b/src/execution_plans/partition_isolator.rs index c5e5b16..77cb9dc 100644 --- a/src/execution_plans/partition_isolator.rs +++ b/src/execution_plans/partition_isolator.rs @@ -1,4 +1,4 @@ -use crate::distributed_physical_optimizer_rule::limit_tasks_err; +use crate::errors::distributed_plan_error::limit_tasks_err; use crate::execution_plans::DistributedTaskContext; use crate::StageExec; use datafusion::common::{exec_err, plan_err}; From b3b21ec79d81862816e6dc9d3c3f8385916e42c6 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 22 Sep 2025 16:39:19 -0400 Subject: [PATCH 2/2] fmt --- src/distributed_physical_optimizer_rule.rs | 4 +++- src/errors/distributed_plan_error.rs | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/src/distributed_physical_optimizer_rule.rs b/src/distributed_physical_optimizer_rule.rs index a6903d6..4524a5d 100644 --- a/src/distributed_physical_optimizer_rule.rs +++ b/src/distributed_physical_optimizer_rule.rs @@ -1,5 +1,7 @@ use super::{NetworkShuffleExec, PartitionIsolatorExec, StageExec}; -use crate::errors::distributed_plan_error::{get_distribute_plan_err, limit_tasks_err, DistributedPlanError}; +use crate::errors::distributed_plan_error::{ + get_distribute_plan_err, limit_tasks_err, DistributedPlanError, +}; use crate::execution_plans::NetworkCoalesceExec; use datafusion::common::plan_err; use datafusion::common::tree_node::TreeNodeRecursion; diff --git a/src/errors/distributed_plan_error.rs b/src/errors/distributed_plan_error.rs index 907c4ac..6463aa4 100644 --- a/src/errors/distributed_plan_error.rs +++ b/src/errors/distributed_plan_error.rs @@ -1,5 +1,5 @@ -use std::fmt::{Display, Formatter}; use std::error::Error; +use std::fmt::{Display, Formatter}; use datafusion::error::DataFusionError; @@ -35,4 +35,4 @@ pub fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlan return None; }; err.downcast_ref() -} \ No newline at end of file +}