diff --git a/src/distributed_physical_optimizer_rule.rs b/src/distributed_physical_optimizer_rule.rs index 7fc0607..4524a5d 100644 --- a/src/distributed_physical_optimizer_rule.rs +++ b/src/distributed_physical_optimizer_rule.rs @@ -1,4 +1,7 @@ use super::{NetworkShuffleExec, PartitionIsolatorExec, StageExec}; +use crate::errors::distributed_plan_error::{ + get_distribute_plan_err, limit_tasks_err, DistributedPlanError, +}; use crate::execution_plans::NetworkCoalesceExec; use datafusion::common::plan_err; use datafusion::common::tree_node::TreeNodeRecursion; @@ -15,8 +18,6 @@ use datafusion::{ physical_optimizer::PhysicalOptimizerRule, physical_plan::{repartition::RepartitionExec, ExecutionPlan}, }; -use std::error::Error; -use std::fmt::{Display, Formatter}; use std::sync::Arc; use uuid::Uuid; @@ -321,40 +322,6 @@ pub trait NetworkBoundary: ExecutionPlan { } } -/// Error thrown during distributed planning that prompts the planner to change something and -/// try again. -#[derive(Debug)] -enum DistributedPlanError { - /// Prompts the planner to limit the amount of tasks used in the stage that is currently - /// being planned. - LimitTasks(usize), -} - -impl Display for DistributedPlanError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - match self { - DistributedPlanError::LimitTasks(n) => { - write!(f, "LimitTasksErr: {n}") - } - } - } -} - -impl Error for DistributedPlanError {} - -/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner -/// to try rebuilding the current stage with a limited amount of tasks. -pub fn limit_tasks_err(limit: usize) -> DataFusionError { - DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit))) -} - -fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> { - let DataFusionError::External(err) = err else { - return None; - }; - err.downcast_ref() -} - #[cfg(test)] mod tests { use crate::assert_snapshot; diff --git a/src/errors/distributed_plan_error.rs b/src/errors/distributed_plan_error.rs new file mode 100644 index 0000000..6463aa4 --- /dev/null +++ b/src/errors/distributed_plan_error.rs @@ -0,0 +1,38 @@ +use std::error::Error; +use std::fmt::{Display, Formatter}; + +use datafusion::error::DataFusionError; + +/// Error thrown during distributed planning that prompts the planner to change something and +/// try again. +#[derive(Debug)] +pub enum DistributedPlanError { + /// Prompts the planner to limit the amount of tasks used in the stage that is currently + /// being planned. + LimitTasks(usize), +} + +impl Display for DistributedPlanError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + DistributedPlanError::LimitTasks(n) => { + write!(f, "LimitTasksErr: {n}") + } + } + } +} + +impl Error for DistributedPlanError {} + +/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner +/// to try rebuilding the current stage with a limited amount of tasks. +pub fn limit_tasks_err(limit: usize) -> DataFusionError { + DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit))) +} + +pub fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> { + let DataFusionError::External(err) = err else { + return None; + }; + err.downcast_ref() +} diff --git a/src/errors/mod.rs b/src/errors/mod.rs index e2a76d6..a1692d0 100644 --- a/src/errors/mod.rs +++ b/src/errors/mod.rs @@ -8,6 +8,7 @@ use prost::Message; mod arrow_error; mod datafusion_error; +pub mod distributed_plan_error; mod io_error; mod objectstore_error; mod parquet_error; diff --git a/src/execution_plans/network_coalesce.rs b/src/execution_plans/network_coalesce.rs index f2e047b..f0e8e36 100644 --- a/src/execution_plans/network_coalesce.rs +++ b/src/execution_plans/network_coalesce.rs @@ -1,7 +1,8 @@ use crate::channel_resolver_ext::get_distributed_channel_resolver; use crate::common::scale_partitioning_props; use crate::config_extension_ext::ContextGrpcMetadata; -use crate::distributed_physical_optimizer_rule::{limit_tasks_err, NetworkBoundary}; +use crate::distributed_physical_optimizer_rule::NetworkBoundary; +use crate::errors::distributed_plan_error::limit_tasks_err; use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error}; use crate::execution_plans::{DistributedTaskContext, StageExec}; use crate::flight_service::DoGet; diff --git a/src/execution_plans/partition_isolator.rs b/src/execution_plans/partition_isolator.rs index c5e5b16..77cb9dc 100644 --- a/src/execution_plans/partition_isolator.rs +++ b/src/execution_plans/partition_isolator.rs @@ -1,4 +1,4 @@ -use crate::distributed_physical_optimizer_rule::limit_tasks_err; +use crate::errors::distributed_plan_error::limit_tasks_err; use crate::execution_plans::DistributedTaskContext; use crate::StageExec; use datafusion::common::{exec_err, plan_err};