Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 3 additions & 36 deletions src/distributed_physical_optimizer_rule.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use super::{NetworkShuffleExec, PartitionIsolatorExec, StageExec};
use crate::errors::distributed_plan_error::{
get_distribute_plan_err, limit_tasks_err, DistributedPlanError,
};
use crate::execution_plans::NetworkCoalesceExec;
use datafusion::common::plan_err;
use datafusion::common::tree_node::TreeNodeRecursion;
Expand All @@ -15,8 +18,6 @@ use datafusion::{
physical_optimizer::PhysicalOptimizerRule,
physical_plan::{repartition::RepartitionExec, ExecutionPlan},
};
use std::error::Error;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
use uuid::Uuid;

Expand Down Expand Up @@ -321,40 +322,6 @@ pub trait NetworkBoundary: ExecutionPlan {
}
}

/// Error thrown during distributed planning that prompts the planner to change something and
/// try again.
#[derive(Debug)]
enum DistributedPlanError {
/// Prompts the planner to limit the amount of tasks used in the stage that is currently
/// being planned.
LimitTasks(usize),
}

impl Display for DistributedPlanError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DistributedPlanError::LimitTasks(n) => {
write!(f, "LimitTasksErr: {n}")
}
}
}
}

impl Error for DistributedPlanError {}

/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner
/// to try rebuilding the current stage with a limited amount of tasks.
pub fn limit_tasks_err(limit: usize) -> DataFusionError {
DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit)))
}

fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> {
let DataFusionError::External(err) = err else {
return None;
};
err.downcast_ref()
}

#[cfg(test)]
mod tests {
use crate::assert_snapshot;
Expand Down
38 changes: 38 additions & 0 deletions src/errors/distributed_plan_error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
use std::error::Error;
use std::fmt::{Display, Formatter};

use datafusion::error::DataFusionError;

/// Error thrown during distributed planning that prompts the planner to change something and
/// try again.
#[derive(Debug)]
pub enum DistributedPlanError {
/// Prompts the planner to limit the amount of tasks used in the stage that is currently
/// being planned.
LimitTasks(usize),
}

impl Display for DistributedPlanError {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
DistributedPlanError::LimitTasks(n) => {
write!(f, "LimitTasksErr: {n}")
}
}
}
}

impl Error for DistributedPlanError {}

/// Builds a [DistributedPlanError::LimitTasks] error. This error prompts the distributed planner
/// to try rebuilding the current stage with a limited amount of tasks.
pub fn limit_tasks_err(limit: usize) -> DataFusionError {
DataFusionError::External(Box::new(DistributedPlanError::LimitTasks(limit)))
}

pub fn get_distribute_plan_err(err: &DataFusionError) -> Option<&DistributedPlanError> {
let DataFusionError::External(err) = err else {
return None;
};
err.downcast_ref()
}
1 change: 1 addition & 0 deletions src/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use prost::Message;

mod arrow_error;
mod datafusion_error;
pub mod distributed_plan_error;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if this looked confusing, the errors module here is not really a collection of errors used along the project (as it usually happens in Rust projects). These are meant to be just Protobuf variants for upstream's DataFusionError struct rather than actual errors that can be thrown, so that means probably DistributedPlanError should not belong here.

That being said, we probably should move this errors folder inside of the protobuf now that we have one, that would scope things better and avoid confusion.

About whether having an errors module with an actual collection of errors (with just the DistributedPlanError for now), or just leaving the DistributedPlanError where it is, no strong opinion.

mod io_error;
mod objectstore_error;
mod parquet_error;
Expand Down
3 changes: 2 additions & 1 deletion src/execution_plans/network_coalesce.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use crate::channel_resolver_ext::get_distributed_channel_resolver;
use crate::common::scale_partitioning_props;
use crate::config_extension_ext::ContextGrpcMetadata;
use crate::distributed_physical_optimizer_rule::{limit_tasks_err, NetworkBoundary};
use crate::distributed_physical_optimizer_rule::NetworkBoundary;
use crate::errors::distributed_plan_error::limit_tasks_err;
use crate::errors::{map_flight_to_datafusion_error, map_status_to_datafusion_error};
use crate::execution_plans::{DistributedTaskContext, StageExec};
use crate::flight_service::DoGet;
Expand Down
2 changes: 1 addition & 1 deletion src/execution_plans/partition_isolator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::distributed_physical_optimizer_rule::limit_tasks_err;
use crate::errors::distributed_plan_error::limit_tasks_err;
use crate::execution_plans::DistributedTaskContext;
use crate::StageExec;
use datafusion::common::{exec_err, plan_err};
Expand Down