From fd2d835855dabeb2e4ec09954a6b0e7b18e2e826 Mon Sep 17 00:00:00 2001 From: Connor Tsui Date: Thu, 24 Apr 2025 14:33:15 -0400 Subject: [PATCH] remove optimizer --- optd/src/core/mod.rs | 1 - optd/src/core/optimizer/egest.rs | 201 --------- optd/src/core/optimizer/handlers.rs | 405 ------------------- optd/src/core/optimizer/ingest.rs | 198 --------- optd/src/core/optimizer/jobs.rs | 451 --------------------- optd/src/core/optimizer/memo.rs | 1 - optd/src/core/optimizer/merge.rs | 198 --------- optd/src/core/optimizer/mod.rs | 281 ------------- optd/src/core/optimizer/subscriptions.rs | 205 ---------- optd/src/core/optimizer/tasks.rs | 494 ----------------------- 10 files changed, 2435 deletions(-) delete mode 100644 optd/src/core/optimizer/egest.rs delete mode 100644 optd/src/core/optimizer/handlers.rs delete mode 100644 optd/src/core/optimizer/ingest.rs delete mode 100644 optd/src/core/optimizer/jobs.rs delete mode 100644 optd/src/core/optimizer/memo.rs delete mode 100644 optd/src/core/optimizer/merge.rs delete mode 100644 optd/src/core/optimizer/mod.rs delete mode 100644 optd/src/core/optimizer/subscriptions.rs delete mode 100644 optd/src/core/optimizer/tasks.rs diff --git a/optd/src/core/mod.rs b/optd/src/core/mod.rs index 781ae5ed..a9ce3c10 100644 --- a/optd/src/core/mod.rs +++ b/optd/src/core/mod.rs @@ -2,4 +2,3 @@ pub mod bridge; pub mod cir; pub mod error; pub mod memo; -pub mod optimizer; diff --git a/optd/src/core/optimizer/egest.rs b/optd/src/core/optimizer/egest.rs deleted file mode 100644 index 93965820..00000000 --- a/optd/src/core/optimizer/egest.rs +++ /dev/null @@ -1,201 +0,0 @@ -use std::sync::Arc; - -use async_recursion::async_recursion; -use futures::future::try_join_all; - -use crate::core::{ - cir::{Child, GoalMemberId, Operator, PartialPhysicalPlan, PhysicalExpressionId, PhysicalPlan}, - error::Error, - memo::Memoize, -}; - -use super::Optimizer; - -impl Optimizer { - /// Recursively transforms a physical expression ID in the memo into a complete physical plan. - /// - /// This function retrieves the physical expression from the memo and recursively - /// transforms any child goal members into their corresponding best physical plans. - /// - /// # Parameters - /// * `expression_id` - ID of the physical expression to transform into a complete plan. - /// - /// # Returns - /// * `Ok(Some(PhysicalPlan))` if all child plans were successfully constructed from their IDs. - /// * `Ok(None)` if any goal ID lacks a best expression ID. - /// * `Err(Error)` if a memo operation fails. - #[async_recursion] - pub(super) async fn egest_best_plan( - &self, - expression_id: PhysicalExpressionId, - ) -> Result, Error> { - let expression = self.memo.materialize_physical_expr(expression_id).await?; - - let child_results = try_join_all( - expression - .children - .iter() - .map(|child| self.egest_child_plan(child)), - ) - .await?; - - let child_plans = match child_results.into_iter().collect::>>() { - Some(plans) => plans, - None => return Ok(None), - }; - - Ok(Some(PhysicalPlan(Operator { - tag: expression.tag, - data: expression.data, - children: child_plans, - }))) - } - - /// Converts a physical expression ID to a partial physical plan. - /// - /// This method materializes the expression and recursively processes its children, - /// preserving goal references as unmaterialized plans. - /// - /// # Parameters - /// * `expression_id` - ID of the physical expression to convert to a partial plan. - /// - /// # Returns - /// * `PartialPhysicalPlan` - The materialized partial plan. - /// * `Err(Error)` if a memo operation fails. - pub(super) async fn egest_partial_plan( - &self, - expression_id: PhysicalExpressionId, - ) -> Result { - let expression = self.memo.materialize_physical_expr(expression_id).await?; - - let children = try_join_all( - expression - .children - .iter() - .map(|child| self.egest_partial_child(child.clone())), - ) - .await?; - - Ok(PartialPhysicalPlan::Materialized(Operator { - tag: expression.tag, - data: expression.data, - children, - })) - } - - async fn egest_child_plan( - &self, - child: &Child, - ) -> Result>>, Error> { - match child { - Child::Singleton(member) => { - let plan = match self.process_goal_member(*member).await? { - Some(plan) => plan, - None => return Ok(None), - }; - Ok(Some(Child::Singleton(plan.into()))) - } - Child::VarLength(members) => { - let futures = members.iter().map(|member| async move { - let plan = match self.process_goal_member(*member).await? { - Some(plan) => plan, - None => return Ok(None), - }; - Ok(Some(plan.into())) - }); - - let result_plans = match try_join_all(futures).await?.into_iter().collect() { - Some(plans) => plans, - None => return Ok(None), - }; - - Ok(Some(Child::VarLength(result_plans))) - } - } - } - - async fn process_goal_member( - &self, - member: GoalMemberId, - ) -> Result, Error> { - match member { - GoalMemberId::PhysicalExpressionId(expr_id) => self.egest_best_plan(expr_id).await, - GoalMemberId::GoalId(goal_id) => { - let (best_expr_id, _) = - match self.memo.get_best_optimized_physical_expr(goal_id).await? { - Some(expr) => expr, - None => return Ok(None), - }; - - self.egest_best_plan(best_expr_id).await - } - } - } - - async fn egest_partial_child( - &self, - child: Child, - ) -> Result>, Error> { - match child { - Child::Singleton(member) => match member { - GoalMemberId::GoalId(goal_id) => { - let goal = self.memo.materialize_goal(goal_id).await?; - Ok(Child::Singleton( - PartialPhysicalPlan::UnMaterialized(goal).into(), - )) - } - GoalMemberId::PhysicalExpressionId(expr_id) => { - let expr = self.memo.materialize_physical_expr(expr_id).await?; - - let children = try_join_all( - expr.children - .iter() - .map(|child| self.egest_partial_child(child.clone())), - ) - .await?; - - let op = Operator { - tag: expr.tag, - data: expr.data, - children, - }; - - Ok(Child::Singleton( - PartialPhysicalPlan::Materialized(op).into(), - )) - } - }, - Child::VarLength(members) => { - let goals = try_join_all(members.into_iter().map(|member| async move { - match member { - GoalMemberId::GoalId(goal_id) => { - let goal = self.memo.materialize_goal(goal_id).await?; - Ok(PartialPhysicalPlan::UnMaterialized(goal).into()) - } - GoalMemberId::PhysicalExpressionId(expr_id) => { - let expr = self.memo.materialize_physical_expr(expr_id).await?; - - let children = try_join_all( - expr.children - .iter() - .map(|child| self.egest_partial_child(child.clone())), - ) - .await?; - - let op = Operator { - tag: expr.tag, - data: expr.data, - children, - }; - - Ok(PartialPhysicalPlan::Materialized(op).into()) - } - } - })) - .await?; - - Ok(Child::VarLength(goals)) - } - } - } -} diff --git a/optd/src/core/optimizer/handlers.rs b/optd/src/core/optimizer/handlers.rs deleted file mode 100644 index 24d2d856..00000000 --- a/optd/src/core/optimizer/handlers.rs +++ /dev/null @@ -1,405 +0,0 @@ -use super::{ - JobId, OptimizeRequest, Optimizer, OptimizerMessage, PendingMessage, TaskId, - ingest::LogicalIngest, - jobs::JobKind, - tasks::{CostExpressionTask, ImplementExpressionTask, TaskKind, TransformExpressionTask}, -}; -use crate::core::{ - cir::{ - Cost, Goal, GoalId, GoalMemberId, GroupId, LogicalExpressionId, LogicalPlan, - LogicalProperties, PartialLogicalPlan, PartialPhysicalPlan, PhysicalExpressionId, - PhysicalPlan, PhysicalProperties, - }, - error::Error, - memo::Memoize, -}; -use crate::dsl::{ - analyzer::hir::Value, - engine::{Continuation, EngineResponse}, -}; -use JobKind::*; -use LogicalIngest::*; -use OptimizerMessage::*; -use TaskKind::*; -use futures::{SinkExt, channel::mpsc::Sender}; - -impl Optimizer { - /// This method initiates the optimization process for a logical plan by launching - /// an optimization task. It may need dependencies. - /// - /// # Parameters - /// * `plan` - The logical plan to optimize. - /// * `response_tx` - Channel to send the resulting physical plan. - /// * `task_id` - ID of the task that initiated this request. - /// - /// # Returns - /// * `Result<(), Error>` - Success or error during processing. - pub(super) async fn process_optimize_request( - &mut self, - plan: LogicalPlan, - response_tx: Sender, - task_id: TaskId, - ) -> Result<(), Error> { - match self.probe_ingest_logical_plan(&plan.clone().into()).await? { - Found(group_id) => { - // The goal represents what we want to achieve: optimize the root group - // with no specific physical properties required. - let goal = Goal(group_id, PhysicalProperties(None)); - let goal_id = self.memo.get_goal_id(&goal).await?; - - // This ensures the task will be notified when optimized expressions - // for this goal are found. - self.subscribe_task_to_goal(goal_id, task_id).await?; - } - Missing(logical_exprs) => { - // Store the request as a pending message that will be processed - // once all create task dependencies are resolved. - let pending_dependencies = logical_exprs - .iter() - .cloned() - .map(|logical_expr_id| { - self.schedule_job(task_id, DeriveLogicalProperties(logical_expr_id)) - }) - .collect(); - - let pending_message = PendingMessage { - message: OptimizeRequestWrapper(OptimizeRequest { plan, response_tx }, task_id), - pending_dependencies, - }; - - self.pending_messages.push(pending_message); - } - } - - Ok(()) - } - - /// This method handles new logical plan alternatives discovered through - /// transformation rule application. - /// - /// # Parameters - /// * `plan` - The partial logical plan to process. - /// * `group_id` - ID of the group associated with this plan. - /// * `job_id` - ID of the job that generated this plan. - /// - /// # Returns - /// * `Result<(), Error>` - Success or error during processing. - pub(super) async fn process_new_logical_partial( - &mut self, - plan: PartialLogicalPlan, - group_id: GroupId, - job_id: JobId, - ) -> Result<(), Error> { - let group_id = self.memo.find_repr_group(group_id).await?; - match self.probe_ingest_logical_plan(&plan).await? { - Found(new_group_id) if new_group_id != group_id => { - // Atomically perform the merge in the memo and process all results. - let merge_results = self.memo.merge_groups(group_id, new_group_id).await?; - self.handle_merge_result(merge_results).await?; - } - Found(_) => { - // Group already exists, nothing to merge or do. - } - Missing(logical_exprs) => { - // Store the request as a pending message that will be processed - // once all create task dependencies are resolved. - let related_task_id = self.running_jobs[&job_id].0; - let pending_dependencies = logical_exprs - .iter() - .cloned() - .map(|logical_expr_id| { - self.schedule_job(related_task_id, DeriveLogicalProperties(logical_expr_id)) - }) - .collect(); - - let pending_message = PendingMessage { - message: NewLogicalPartial(plan, group_id, job_id), - pending_dependencies, - }; - - self.pending_messages.push(pending_message); - } - } - - Ok(()) - } - - /// This method handles new physical implementations discovered through - /// implementation rule application. - /// - /// # Parameters - /// * `plan` - The partial physical plan to process. - /// * `goal_id` - ID of the goal associated with this plan. - /// * `job_id` - ID of the job that generated this plan. - /// - /// # Returns - /// * `Result<(), Error>` - Success or error during processing. - pub(super) async fn process_new_physical_partial( - &mut self, - plan: PartialPhysicalPlan, - goal_id: GoalId, - job_id: JobId, - ) -> Result<(), Error> { - let goal_id = self.memo.find_repr_goal(goal_id).await?; - - let member = self.probe_ingest_physical_plan(&plan).await?; - let is_new = self.memo.add_goal_member(goal_id, member).await?; - - if is_new { - match member { - GoalMemberId::PhysicalExpressionId(_expression_id) => { - let _parent_task_id = self.running_jobs[&job_id].0; - // TODO(Alexis): Needs to ensure cost expression task exists and then subs. - } - GoalMemberId::GoalId(_) => { - // TODO(Alexis); Need to launch a new implement - } - } - } - - Ok(()) - } - - /// This method handles fully optimized physical expressions with cost information. - /// - /// When a new optimized expression is found, it's added to the memo. If it becomes - /// the new best expression for its goal, continuations are notified and and clients - /// receive the corresponding egested plan. - /// - /// # Parameters - /// * `expression_id` - ID of the physical expression to process. - /// * `cost` - Cost information for the expression. - /// - /// # Returns - /// * `Result<(), Error>` - Success or error during processing. - pub(super) async fn process_new_costed_physical( - &mut self, - expression_id: PhysicalExpressionId, - cost: Cost, - ) -> Result<(), Error> { - let expression_id = self.memo.find_repr_physical_expr(expression_id).await?; - let new_best = self - .memo - .update_physical_expr_cost(expression_id, cost) - .await?; - - // If this is the new best expression found so far for this goal, - // schedule continuation jobs for all subscribers and send to clients. - if new_best { - // TODO(Alexis): Needs to send to parents. - // self.schedule_optimized_continuations(goal_id, expression_id, cost); - // self.egest_to_subscribers(goal_id, expression_id).await?; - } - - Ok(()) - } - - /// This method handles group creation for expressions with derived properties - /// and updates any pending messages that depend on this group. - /// - /// # Parameters - /// * `expression_id` - ID of the logical expression to create a group for. - /// * `properties` - Logical properties associated with the expression. - /// * `job_id` - ID of the job that initiated this request. - /// - /// # Returns - /// * `Result<(), Error>` - Success or error during processing. - pub(super) async fn process_create_group( - &mut self, - expression_id: LogicalExpressionId, - properties: &LogicalProperties, - job_id: JobId, - ) -> Result<(), Error> { - self.memo.create_group(expression_id, properties).await?; - self.resolve_dependencies(job_id).await; - Ok(()) - } - - /// Registers a continuation for receiving logical expressions from a group. - /// The continuation will receive notifications about both existing and new expressions. - /// - /// # Parameters - /// * `group_id` - ID of the group to subscribe to. - /// * `continuation` - Continuation to call when new expressions are found. - /// * `job_id` - ID of the job that initiated this request. - /// - /// # Returns - /// * `Result<(), Error>` - Success or error during processing. - pub(super) async fn process_group_subscription( - &mut self, - group_id: GroupId, - continuation: Continuation>, - job_id: JobId, - ) -> Result<(), Error> { - let related_task_id = self.running_jobs[&job_id].0; - - // Register the continuation and notify the memo about the dependency to ensure the - // operation corresponding to the task gets invalidated when the group has new expressions. - match &mut self.tasks.get_mut(&related_task_id).unwrap().kind { - TransformExpression(TransformExpressionTask { - rule, - expression_id, - continuations, - .. - }) => { - continuations - .entry(group_id) - .or_default() - .push(continuation.clone()); - - self.memo - .add_transformation_dependency(*expression_id, rule, group_id) - .await?; - } - ImplementExpression(ImplementExpressionTask { - rule, - expression_id, - goal_id, - continuations, - .. - }) => { - continuations - .entry(group_id) - .or_default() - .push(continuation.clone()); - - self.memo - .add_implementation_dependency(*expression_id, *goal_id, rule, group_id) - .await?; - } - _ => panic!("Task type cannot produce group subscription."), - } - - // Subscribe to future expressions and bootstrap with existing ones. - let expressions = self - .subscribe_task_to_group(group_id, related_task_id) - .await?; - - for expression_id in expressions { - self.schedule_job( - related_task_id, - ContinueWithLogical(expression_id, continuation.clone()), - ); - } - - Ok(()) - } - - /// Registers a continuation for receiving optimized physical expressions for a goal. - /// The continuation will be notified about the best existing expression and any better ones found. - /// - /// # Parameters - /// * `goal` - The goal to subscribe to. - /// * `continuation` - Continuation to call when new optimized expressions are found. - /// * `job_id` - ID of the job that initiated this request. - /// - /// # Returns - /// * `Result<(), Error>` - Success or error during processing. - pub(super) async fn process_goal_subscription( - &mut self, - goal: &Goal, - continuation: Continuation>, - job_id: JobId, - ) -> Result<(), Error> { - let related_task_id = self.running_jobs[&job_id].0; - let goal_id = self.memo.get_goal_id(goal).await?; - - // Register the continuation and notify the memo about the dependency to ensure the - // operation corresponding to the task gets invalidated when the goal has a new optimum. - match &mut self.tasks.get_mut(&related_task_id).unwrap().kind { - CostExpression(CostExpressionTask { - expression_id, - continuations, - .. - }) => { - continuations - .entry(goal_id) - .or_default() - .push(continuation.clone()); - - self.memo - .add_cost_dependency(*expression_id, goal_id) - .await?; - } - _ => panic!("Only cost tasks can subscribe to goals."), - } - - // Subscribe to future optimized expressions and bootstrap with current best. - if let Some((best_expr_id, cost)) = self - .subscribe_task_to_goal(goal_id, related_task_id) - .await? - { - self.schedule_job( - related_task_id, - ContinueWithCostedPhysical(best_expr_id, cost, continuation), - ); - } - - Ok(()) - } - - /// Retrieves the logical properties for the given group from the memo - /// and sends them back to the requestor through the provided oneshot channel. - /// - /// # Parameters - /// * `group_id` - ID of the group to retrieve properties for. - /// * `sender` - Channel to send the properties through. - /// - /// # Returns - /// * `Result<(), Error>` - Success or error during processing. - pub(super) async fn process_retrieve_properties( - &mut self, - group_id: GroupId, - mut sender: Sender, - ) -> Result<(), Error> { - let props = self.memo.get_logical_properties(group_id).await?; - - // We don't want to make a job out of this, as it is merely a way to unblock - // an existing pending job. We send it to the channel without blocking the - // main co-routine. - tokio::spawn(async move { - sender - .send(props) - .await - .expect("Failed to send properties - channel closed."); - }); - - Ok(()) - } - - /// Helper method to resolve dependencies after a group creation job completes. - /// - /// This method is called when a group creation job completes. It updates all - /// pending messages that were waiting for this job and processes any that - /// are now ready (have no more pending dependencies). - /// - /// # Parameters - /// * `completed_job_id` - ID of the completed job. - async fn resolve_dependencies(&mut self, completed_job_id: JobId) { - // Update dependencies and collect ready messages. - let ready_indices: Vec<_> = self - .pending_messages - .iter_mut() - .enumerate() - .filter_map(|(i, pending)| { - pending.pending_dependencies.remove(&completed_job_id); - pending.pending_dependencies.is_empty().then_some(i) - }) - .collect(); - - // Process all ready messages (in reverse order to avoid index issues when removing). - for i in ready_indices.iter().rev() { - let pending = self.pending_messages.swap_remove(*i); - - // Re-send the message to be processed in a new co-routine to not block the - // main co-routine. - let mut message_tx = self.message_tx.clone(); - tokio::spawn(async move { - message_tx - .send(pending.message) - .await - .expect("Failed to re-send ready message - channel closed."); - }); - } - } -} diff --git a/optd/src/core/optimizer/ingest.rs b/optd/src/core/optimizer/ingest.rs deleted file mode 100644 index 5f6d6462..00000000 --- a/optd/src/core/optimizer/ingest.rs +++ /dev/null @@ -1,198 +0,0 @@ -use super::Optimizer; -use crate::core::{ - cir::{ - Child, GoalMemberId, GroupId, LogicalExpression, LogicalExpressionId, Operator, - PartialLogicalPlan, PartialPhysicalPlan, PhysicalExpression, - }, - error::Error, - memo::Memoize, -}; -use Child::*; -use LogicalIngest::*; -use async_recursion::async_recursion; -use std::sync::Arc; - -/// Result type for logical plan probing. -pub(super) enum LogicalIngest { - /// Plan was successfully found in the memo. - Found(GroupId), - /// Plan requires groups to be created for missing expressions. - Missing(Vec), -} - -impl Optimizer { - /// Probes for a logical plan in the memo. - /// - /// Logical plans need property derivation which occurs asynchronously - /// in separate jobs after this function returns. This separation lets - /// the optimizer continue processing while properties are computed - /// in the background. - /// - /// # Returns - /// - `Found(group_id)`: Plan exists in the memo. - /// - `Missing(expressions)`: Expressions needing property derivation. - pub(super) async fn probe_ingest_logical_plan( - &mut self, - logical_plan: &PartialLogicalPlan, - ) -> Result { - match logical_plan { - PartialLogicalPlan::Materialized(operator) => { - self.probe_ingest_logical_operator(operator).await - } - PartialLogicalPlan::UnMaterialized(group_id) => Ok(Found(*group_id)), - } - } - - /// Probes for a physical plan in the memo. - /// - /// Recursively processes the physical plan to obtain the goal member identifiers - /// throughout the tree. Unlike logical plans, this doesn't require - /// asynchronous property derivation. - /// - /// # Parameters - /// * `physical_plan` - The physical plan to probe. - /// - /// # Returns - /// The goal member identifier corresponding to the physical plan, - /// which can be either a physical expression ID or a goal ID. - #[async_recursion] - pub(super) async fn probe_ingest_physical_plan( - &mut self, - physical_plan: &PartialPhysicalPlan, - ) -> Result { - match physical_plan { - PartialPhysicalPlan::Materialized(operator) => { - self.probe_ingest_physical_operator(operator).await - } - PartialPhysicalPlan::UnMaterialized(goal) => { - // Base case: is a goal. - let goal_id = self.memo.get_goal_id(goal).await?; - Ok(GoalMemberId::GoalId(goal_id)) - } - } - } - - async fn probe_ingest_logical_operator( - &mut self, - operator: &Operator>, - ) -> Result { - // Sequentially process the children ingestion. - let mut children_results = Vec::with_capacity(operator.children.len()); - for child in &operator.children { - let result = self.probe_ingest_logical_child(child).await?; - children_results.push(result); - } - - // Collect *all* missing expressions from children in order to reduce - // the number of times the optimizer needs to probe the ingestion. - let mut missing_expressions = Vec::new(); - let children = children_results - .into_iter() - .map(|child_result| match child_result { - Singleton(Found(group_id)) => Singleton(group_id), - Singleton(Missing(expressions)) => { - missing_expressions.extend(expressions); - Singleton(GroupId(0)) // Placeholder. - } - VarLength(results) => { - let group_ids = results - .into_iter() - .map(|result| match result { - Found(group_id) => group_id, - Missing(expressions) => { - missing_expressions.extend(expressions); - GroupId(0) // Placeholder. - } - }) - .collect(); - VarLength(group_ids) - } - }) - .collect(); - - // If any children have missing expressions, return those. - if !missing_expressions.is_empty() { - return Ok(Missing(missing_expressions)); - } - - let logical_expression = LogicalExpression { - tag: operator.tag.clone(), - data: operator.data.clone(), - children, - }; - let logical_expression_id = self.memo.get_logical_expr_id(&logical_expression).await?; - - // Base case: check if the expression already exists in the memo. - match self - .memo - .find_logical_expr_group(logical_expression_id) - .await? - { - Some(group_id) => Ok(Found(group_id)), - None => Ok(Missing(vec![logical_expression_id])), - } - } - - #[async_recursion] - async fn probe_ingest_logical_child( - &mut self, - child: &Child>, - ) -> Result, Error> { - match child { - Singleton(plan) => { - let result = self.probe_ingest_logical_plan(plan).await?; - Ok(Singleton(result)) - } - VarLength(plans) => { - let mut results = Vec::with_capacity(plans.len()); - for plan in plans { - let result = self.probe_ingest_logical_plan(plan).await?; - results.push(result); - } - Ok(VarLength(results)) - } - } - } - - async fn probe_ingest_physical_operator( - &mut self, - operator: &Operator>, - ) -> Result { - // Sequentially process the children ingestion. - let mut children = Vec::with_capacity(operator.children.len()); - for child in &operator.children { - let processed_child = self.process_physical_child(child).await?; - children.push(processed_child); - } - - // Base case: is an expression. - let expression = PhysicalExpression { - tag: operator.tag.clone(), - data: operator.data.clone(), - children, - }; - let expression_id = self.memo.get_physical_expr_id(&expression).await?; - - Ok(GoalMemberId::PhysicalExpressionId(expression_id)) - } - - async fn process_physical_child( - &mut self, - child: &Child>, - ) -> Result, Error> { - match child { - Singleton(plan) => { - let member = self.probe_ingest_physical_plan(plan).await?; - Ok(Singleton(member)) - } - VarLength(plans) => { - let mut members = Vec::with_capacity(plans.len()); - for plan in plans { - let member = self.probe_ingest_physical_plan(plan).await?; - members.push(member); - } - Ok(VarLength(members)) - } - } - } -} diff --git a/optd/src/core/optimizer/jobs.rs b/optd/src/core/optimizer/jobs.rs deleted file mode 100644 index cca703c9..00000000 --- a/optd/src/core/optimizer/jobs.rs +++ /dev/null @@ -1,451 +0,0 @@ -use super::Task; -use super::tasks::{ImplementExpressionTask, TaskKind, TransformExpressionTask}; -use super::{Optimizer, OptimizerMessage, tasks::TaskId}; -use crate::core::bridge::from_cir::{ - costed_physical_to_value, partial_logical_to_value, partial_physical_to_value, - physical_properties_to_value, -}; -use crate::core::bridge::into_cir::{ - hir_goal_to_cir, hir_group_id_to_cir, value_to_cost, value_to_logical_properties, - value_to_partial_logical, value_to_partial_physical, -}; -use crate::core::cir::{ - Cost, Goal, GoalId, GroupId, ImplementationRule, LogicalExpressionId, PartialLogicalPlan, - PhysicalExpressionId, TransformationRule, -}; -use crate::core::error::Error; -use crate::core::memo::Memoize; -use crate::dsl::analyzer::hir::Value; -use crate::dsl::engine::{Continuation, Engine, EngineResponse}; -use JobKind::*; -use OptimizerMessage::*; -use TaskKind::*; -use futures::SinkExt; -use futures::channel::mpsc::Sender; -use std::sync::Arc; - -/// Unique identifier for jobs in the optimization system. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub(super) struct JobId(pub i64); - -/// A job represents a discrete unit of work within the optimization process. -/// -/// Jobs are launched by tasks and represent atomic operations that contribute to -/// completing the task. Multiple jobs may be launched by a single task, and all -/// jobs must complete before a task is considered (temporarily) finished. -#[derive(Clone)] -pub(super) struct Job(pub TaskId, pub JobKind); - -/// Enumeration of different types of jobs in the optimizer. -/// -/// Each variant represents a specific optimization operation that can be -/// performed asynchronously and independently. -#[derive(Clone)] -pub(super) enum JobKind { - /// Derives logical properties for a logical expression. - /// - /// This job computes schema, cardinality estimates, and other - /// statistical properties of a logical expression. - DeriveLogicalProperties(LogicalExpressionId), - - /// Starts applying a transformation rule to a logical expression. - /// - /// This job generates alternative logical expressions that are - /// semantically equivalent to the original. - StartTransformationRule(TransformationRule, LogicalExpressionId, GroupId), - - /// Starts applying an implementation rule to a logical expression and properties. - /// - /// This job generates physical implementations of a logical expression - /// based on specific implementation strategies. - StartImplementationRule(ImplementationRule, LogicalExpressionId, GoalId), - - /// Starts computing the cost of a physical expression. - /// - /// This job estimates the execution cost of a physical implementation - /// to aid in selecting the optimal plan. - StartCostExpression(PhysicalExpressionId), - - /// Continues processing with a logical expression result. - /// - /// This job represents a continuation-passing-style callback for - /// handling the result of a logical expression operation. - ContinueWithLogical( - LogicalExpressionId, - Continuation>, - ), - - /// Continues processing with an optimized expression result. - /// - /// This job represents a continuation-passing-style callback for - /// handling the result of an optimized physical expression operation. - ContinueWithCostedPhysical( - PhysicalExpressionId, - Cost, - Continuation>, - ), -} - -impl Optimizer { - // - // Job Scheduling and Management - // - - /// Schedules a new job and associates it with a task. - /// - /// This method creates a job of the specified kind, associates it with - /// the given task, adds it to the pending jobs collection, and updates - /// the task's uncompleted jobs set. - /// - /// # Parameters - /// * `task_id` - The ID of the task that's launching this job. - /// * `kind` - The kind of job to create. - /// - /// # Returns - /// * The ID of the created job. - pub(super) fn schedule_job(&mut self, task_id: TaskId, kind: JobKind) -> JobId { - // Generate a new job ID. - let job_id = self.next_job_id; - self.next_job_id.0 += 1; - - // Create & schedule the job. - let job = Job(task_id, kind); - self.pending_jobs.insert(job_id, job); - self.job_schedule_queue.push_back(job_id); - - // Add job to task's uncompleted jobs set. - self.tasks - .get_mut(&task_id) - .unwrap() - .uncompleted_jobs - .insert(job_id); - - job_id - } - - /// Launches all pending jobs until either the maximum concurrent job limit is - /// reached or there are no more jobs to launch. - /// - /// Jobs are launched in FIFO order from the job schedule queue if the number - /// of currently running jobs is below the maximum concurrent jobs limit. - /// - /// # Returns - /// * `Result<(), Error>` - Success or error during job launching. - pub(super) async fn launch_pending_jobs(&mut self) -> Result<(), Error> { - // Launch jobs only if we're below the maximum concurrent jobs limit, in FIFO order. - while self.running_jobs.len() < self.max_concurrent_jobs - && !self.job_schedule_queue.is_empty() - { - let job_id = self.job_schedule_queue.pop_front().unwrap(); - - // Move the job from pending to running. - let job = self.pending_jobs.remove(&job_id).unwrap(); - self.running_jobs.insert(job_id, job.clone()); - - // Dispatch & execute the job in a new co-routine. - match job.1 { - DeriveLogicalProperties(logical_expr_id) => { - self.derive_logical_properties(logical_expr_id, job_id) - .await?; - } - StartTransformationRule(rule_name, logical_expr_id, group_id) => { - self.execute_transformation_rule(rule_name, logical_expr_id, group_id, job_id) - .await?; - } - StartImplementationRule(rule_name, expression_id, goal_id) => { - self.execute_implementation_rule(rule_name, expression_id, goal_id, job_id) - .await?; - } - StartCostExpression(expression_id) => { - self.execute_cost_expression(expression_id, job_id).await?; - } - ContinueWithLogical(logical_expr_id, k) => { - self.execute_continue_with_logical(logical_expr_id, k) - .await?; - } - ContinueWithCostedPhysical(expression_id, cost, k) => { - self.execute_continue_with_optimized(expression_id, cost, k) - .await?; - } - } - } - - Ok(()) - } - - /// Marks a job as completed and updates related task status. - /// - /// This method removes the job from running jobs, updates the task's - /// uncompleted jobs set, and marks the task as clean if it has no more - /// uncompleted jobs. - /// - /// # Parameters - /// * `job_id` - The ID of the job to mark as completed. - /// - /// # Returns - /// * `Result<(), Error>` - Success or error during job completion. - pub(super) async fn complete_job(&mut self, job_id: JobId) -> Result<(), Error> { - // Remove the job from the running jobs. - let Job(task_id, _) = self.running_jobs.remove(&job_id).unwrap(); - - // Remove the job from the task's uncompleted jobs set. - let task = self.tasks.get_mut(&task_id).unwrap(); - task.uncompleted_jobs.remove(&job_id); - - // If the task has no uncompleted jobs, mark it as clean. - if task.uncompleted_jobs.is_empty() { - match &task.kind { - ImplementExpression(ImplementExpressionTask { - rule, - expression_id, - goal_id, - .. - }) => { - self.memo - .set_implementation_clean(*expression_id, *goal_id, rule) - .await?; - } - TransformExpression(TransformExpressionTask { - expression_id, - rule, - .. - }) => { - self.memo - .set_transformation_clean(*expression_id, rule) - .await?; - } - CostExpression(task) => { - self.memo.set_cost_clean(task.expression_id).await?; - } - _ => {} // We don't track status for the other task kinds. - } - } - - // TODO(Alexis): Cleanup the parentless tasks to free up resources. - - Ok(()) - } - - /// Retrieves the task associated with a specific job. - /// - /// # Parameters - /// * `job_id` - The ID of the job to find the related task for. - /// - /// # Returns - /// * `Option<&Task>` - The task associated with the job, if found. - pub(super) fn get_related_task(&self, job_id: JobId) -> Option<&Task> { - let Job(task_id, _) = self.running_jobs.get(&job_id).unwrap(); - self.tasks.get(task_id) - } - - pub(super) async fn send_engine_response( - job_id: JobId, - mut message_tx: Sender, - response: EngineResponse, - ) { - match response { - EngineResponse::Return(value, k) => message_tx.send(k(value).await).await.unwrap(), - EngineResponse::YieldGroup(group_id, k) => message_tx - .send(SubscribeGroup(hir_group_id_to_cir(&group_id), k, job_id)) - .await - .unwrap(), - EngineResponse::YieldGoal(goal, k) => message_tx - .send(SubscribeGoal(hir_goal_to_cir(&goal), k, job_id)) - .await - .unwrap(), - } - } - - // - // Job Execution Methods - // - - /// Executes a job to derive logical properties for a logical expression. - /// - /// This creates an engine instance and launches the property derivation process - /// for the specified logical expression. - async fn derive_logical_properties( - &self, - expression_id: LogicalExpressionId, - job_id: JobId, - ) -> Result<(), Error> { - let engine = Engine::new(self.hir_context.clone(), self.catalog.clone()); - - let plan: PartialLogicalPlan = self - .memo - .materialize_logical_expr(expression_id) - .await? - .into(); - - let message_tx = self.message_tx.clone(); - - tokio::spawn(async move { - let logical_expression_id = expression_id; - let response = engine - .launch_rule( - "derive", - vec![partial_logical_to_value(&plan)], - Arc::new(move |value| { - Box::pin(async move { - let properties = value_to_logical_properties(&value); - // TODO(yuchen): refactor EngineMessage type to include job id in header instead. - CreateGroup(logical_expression_id, properties, JobId(-1)) - }) - }), - ) - .await; - - Self::send_engine_response(job_id, message_tx, response).await; - }); - Ok(()) - } - - /// Executes a job to apply a transformation rule to a logical expression. - /// - /// This creates an engine instance and launches the transformation rule - /// application process for the specified logical expression. - async fn execute_transformation_rule( - &self, - rule_name: TransformationRule, - expression_id: LogicalExpressionId, - group_id: GroupId, - job_id: JobId, - ) -> Result<(), Error> { - let engine = Engine::new(self.hir_context.clone(), self.catalog.clone()); - - let plan: PartialLogicalPlan = self - .memo - .materialize_logical_expr(expression_id) - .await? - .into(); - - let message_tx = self.message_tx.clone(); - tokio::spawn(async move { - let response = engine - .launch_rule( - &rule_name.0, - vec![partial_logical_to_value(&plan)], - Arc::new(move |value| { - let plan = value_to_partial_logical(&value); - - Box::pin(async move { NewLogicalPartial(plan, group_id, JobId(-1)) }) - }), - ) - .await; - - Self::send_engine_response(job_id, message_tx, response).await; - }); - - Ok(()) - } - - /// Executes a job to apply an implementation rule to a logical expression. - /// - /// This creates an engine instance and launches the implementation rule - /// application process for the specified logical expression and goal. - async fn execute_implementation_rule( - &self, - rule_name: ImplementationRule, - expression_id: LogicalExpressionId, - goal_id: GoalId, - job_id: JobId, - ) -> Result<(), Error> { - let engine = Engine::new(self.hir_context.clone(), self.catalog.clone()); - - let Goal(_, physical_props) = self.memo.materialize_goal(goal_id).await?; - let plan = self - .memo - .materialize_logical_expr(expression_id) - .await? - .into(); - - let message_tx = self.message_tx.clone(); - tokio::spawn(async move { - let response = engine - .launch_rule( - &rule_name.0, - vec![ - partial_logical_to_value(&plan), - physical_properties_to_value(&physical_props), - ], - Arc::new(move |value| { - let plan = value_to_partial_physical(&value); - Box::pin(async move { NewPhysicalPartial(plan, goal_id, JobId(-1)) }) - }), - ) - .await; - Self::send_engine_response(job_id, message_tx, response).await; - }); - - Ok(()) - } - - /// Executes a job to compute the cost of a physical expression. - /// - /// This creates an engine instance and launches the cost calculation process - /// for the specified physical expression. - async fn execute_cost_expression( - &self, - expression_id: PhysicalExpressionId, - job_id: JobId, - ) -> Result<(), Error> { - let engine = Engine::new(self.hir_context.clone(), self.catalog.clone()); - - let plan = self.egest_partial_plan(expression_id).await?; - - let message_tx = self.message_tx.clone(); - tokio::spawn(async move { - let response = engine - .launch_rule( - "cost", - vec![partial_physical_to_value(&plan)], - Arc::new(move |value| { - let cost = value_to_cost(&value); - Box::pin(async move { NewCostedPhysical(expression_id, cost, JobId(-1)) }) - }), - ) - .await; - - Self::send_engine_response(job_id, message_tx, response).await; - }); - - Ok(()) - } - - /// Executes a job to continue processing with a logical expression result. - /// - /// This materializes the logical expression and passes it to the continuation. - async fn execute_continue_with_logical( - &self, - expression_id: LogicalExpressionId, - k: Continuation>, - ) -> Result<(), Error> { - let plan = self.memo.materialize_logical_expr(expression_id).await?; - - tokio::spawn(async move { - k(partial_logical_to_value(&plan.into())).await; - }); - - Ok(()) - } - - /// Executes a job to continue processing with an optimized physical expression result. - /// - /// This materializes the physical expression and passes it along with its cost - /// to the continuation. - async fn execute_continue_with_optimized( - &self, - physical_expr_id: PhysicalExpressionId, - cost: Cost, - k: Continuation>, - ) -> Result<(), Error> { - let plan = self.egest_partial_plan(physical_expr_id).await?; - let costed_plan_value = costed_physical_to_value(plan, cost); - - tokio::spawn(async move { - k(costed_plan_value).await; - }); - - Ok(()) - } -} diff --git a/optd/src/core/optimizer/memo.rs b/optd/src/core/optimizer/memo.rs deleted file mode 100644 index 8b137891..00000000 --- a/optd/src/core/optimizer/memo.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/optd/src/core/optimizer/merge.rs b/optd/src/core/optimizer/merge.rs deleted file mode 100644 index 6129fdab..00000000 --- a/optd/src/core/optimizer/merge.rs +++ /dev/null @@ -1,198 +0,0 @@ -use super::Optimizer; -use crate::core::{ - cir::{GoalId, GroupId}, - error::Error, - memo::{Memoize, MergeResult, MergedGoalInfo, MergedGroupInfo}, -}; - -impl Optimizer { - /// Helper method to handle different types of merge results. - /// - /// This method processes the results of group and goal merges, updating - /// subscribers and tasks appropriately. - /// - /// # Parameters - /// * `result` - The merge result to handle. - pub(super) async fn handle_merge_result(&mut self, result: MergeResult) -> Result<(), Error> { - // First, handle all the group merges. - for group_merge in result.group_merges { - let all_exprs_by_group = group_merge.merged_groups; - let new_repr_group_id = group_merge.new_repr_group_id; - - // 1. For each group, schedule expressions from all OTHER groups, - // ignoring any potential duplicates due to merges for now. - for (i, current_group_info) in all_exprs_by_group.iter().enumerate() { - let other_groups_exprs: Vec<_> = all_exprs_by_group - .iter() - .enumerate() - .filter(|(j, _)| *j != i) // Filter out the current group. - .flat_map(|(_, group_info)| &group_info.expressions) - .copied() - .collect(); - - self.schedule_logical_continuations( - current_group_info.group_id, - &other_groups_exprs, - ); - } - - // 2. Handle exploration tasks for the merged groups. - self.merge_exploration_tasks(&all_exprs_by_group, new_repr_group_id) - .await; - - // 3. Merge subscribers. - } - - // Second, handle all the goal merges. - for goal_merge in result.goal_merges { - let all_exprs_by_goal = &goal_merge.merged_goals; - - // 1. For each goal, schedule the best expression from all OTHER goals only if it is - // better than the current best expression for the goal. - for (i, current_goal_info) in all_exprs_by_goal.iter().enumerate() { - let current_cost = current_goal_info.best_expr.as_ref().map(|(_, cost)| cost); - - let best_from_others = all_exprs_by_goal - .iter() - .enumerate() - .filter(|(j, _)| *j != i) // Filter out the current goal. - .filter_map(|(_, goal_info)| goal_info.best_expr) - .filter(|(_, cost)| current_cost.is_none_or(|current| cost < current)) - .fold(None, |acc, (expr_id, cost)| match acc { - None => Some((expr_id, cost)), - Some((_, acc_cost)) if cost < acc_cost => Some((expr_id, cost)), - Some(_) => acc, - }); - - if let Some((best_expr_id, best_cost)) = best_from_others { - self.schedule_optimized_continuations( - current_goal_info.goal_id, - best_expr_id, - best_cost, - ); - self.egest_to_subscribers(current_goal_info.goal_id, best_expr_id) - .await?; - } - } - - // 2. Handling costing tasks for the merged goals. - self.merge_optimization_tasks(all_exprs_by_goal, goal_merge.new_repr_goal_id) - .await; - - // 3. Merge subscribers. - } - - // Third, launch the newly dirty stuff if needed. - - Ok(()) - } - - /// Helper method to merge exploration tasks for merged groups. - /// - /// # Parameters - /// * `all_exprs_by_group` - All groups and their expressions that were merged. - /// * `new_repr_group_id` - The new representative group ID. - async fn merge_exploration_tasks( - &mut self, - all_exprs_by_group: &[MergedGroupInfo], - new_repr_group_id: GroupId, - ) { - // Collect all task IDs associated with the merged groups. - let exploring_tasks: Vec<_> = all_exprs_by_group - .iter() - .filter_map(|group_info| { - self.group_exploration_task_index - .get(&group_info.group_id) - .copied() - .map(|task_id| (task_id, group_info.group_id)) - }) - .collect(); - - match exploring_tasks.as_slice() { - [] => (), // No tasks exist, nothing to do. - - [(task_id, group_id)] => { - // Just one task exists - update its index. - if *group_id != new_repr_group_id { - self.group_exploration_task_index.remove(group_id); - } - - self.group_exploration_task_index - .insert(new_repr_group_id, *task_id); - } - - [(primary_task_id, _), rest @ ..] => { - // Multiple tasks - merge them into the primary task. - let mut children_to_add = Vec::new(); - for (task_id, _) in rest { - let task = self.tasks.get(task_id).unwrap(); - children_to_add.extend(task.children.clone()); - self.tasks.remove(task_id); - } - - let primary_task = self.tasks.get_mut(primary_task_id).unwrap(); - primary_task.children.extend(children_to_add); - - for group_info in all_exprs_by_group { - self.group_exploration_task_index - .remove(&group_info.group_id); - } - - self.group_exploration_task_index - .insert(new_repr_group_id, *primary_task_id); - } - } - } - - /// Helper method to merge optimization tasks for merged goals. - async fn merge_optimization_tasks( - &mut self, - all_exprs_by_goal: &[MergedGoalInfo], - new_repr_goal_id: GoalId, - ) { - // Collect all task IDs associated with the merged goals. - let optimization_tasks: Vec<_> = all_exprs_by_goal - .iter() - .filter_map(|goal_info| { - self.goal_optimization_task_index - .get(&goal_info.goal_id) - .copied() - .map(|task_id| (task_id, goal_info.goal_id)) - }) - .collect(); - - match optimization_tasks.as_slice() { - [] => (), // No tasks exist, nothing to do. - - [(task_id, goal_id)] => { - // Just one task exists - update its index and kind. - if *goal_id != new_repr_goal_id { - self.goal_optimization_task_index.remove(goal_id); - } - - self.goal_optimization_task_index - .insert(new_repr_goal_id, *task_id); - } - - [(primary_task_id, _), rest @ ..] => { - // Multiple tasks - merge them into the primary task. - let mut children_to_add = Vec::new(); - for (task_id, _) in rest { - let task = self.tasks.get(task_id).unwrap(); - children_to_add.extend(task.children.clone()); - self.tasks.remove(task_id); - } - - let primary_task = self.tasks.get_mut(primary_task_id).unwrap(); - primary_task.children.extend(children_to_add); - - for goal_info in all_exprs_by_goal { - self.goal_optimization_task_index.remove(&goal_info.goal_id); - } - - self.goal_optimization_task_index - .insert(new_repr_goal_id, *primary_task_id); - } - } - } -} diff --git a/optd/src/core/optimizer/mod.rs b/optd/src/core/optimizer/mod.rs deleted file mode 100644 index 77104c91..00000000 --- a/optd/src/core/optimizer/mod.rs +++ /dev/null @@ -1,281 +0,0 @@ -use crate::catalog::Catalog; -use crate::core::cir::{ - Cost, Goal, GoalId, GroupId, LogicalExpressionId, LogicalPlan, LogicalProperties, - PartialLogicalPlan, PartialPhysicalPlan, PhysicalExpressionId, PhysicalPlan, RuleBook, -}; -use crate::core::error::Error; -use crate::core::memo::Memoize; -use crate::dsl::analyzer::hir::Value; -use crate::dsl::analyzer::{context::Context, hir::HIR}; -use crate::dsl::engine::{Continuation, EngineResponse}; -use OptimizerMessage::*; -use futures::StreamExt; -use futures::{ - SinkExt, - channel::mpsc::{self, Receiver, Sender}, -}; -use jobs::{Job, JobId}; -use std::collections::{HashMap, HashSet, VecDeque}; -use std::sync::Arc; -use tasks::{Task, TaskId}; - -mod egest; -mod handlers; -mod ingest; -mod jobs; -mod merge; -mod subscriptions; -mod tasks; - -/// Default maximum number of concurrent jobs to run in the optimizer. -const DEFAULT_MAX_CONCURRENT_JOBS: usize = 1000; - -/// External client request to optimize a query in the optimizer. -/// -/// Defines the public API for submitting a query and receiving execution plans. -#[derive(Clone, Debug)] -pub struct OptimizeRequest { - /// The logical plan to optimize. - pub plan: LogicalPlan, - - /// Channel for receiving optimized physical plans. - /// - /// Streams results back as they become available, allowing clients to: - /// * Receive progressively better plans during optimization. - /// * Terminate early when a "good enough" plan is found. - pub response_tx: Sender, -} - -/// Messages passed within the optimization system. -/// -/// Each message that includes a JobId represents the result of a completed job, -/// allowing the optimizer to track which tasks are progressing. -#[derive(Clone)] -enum OptimizerMessage { - /// Process an optimization request. - OptimizeRequestWrapper(OptimizeRequest, TaskId), - - /// New logical plan alternative for a group from applying transformation rules. - NewLogicalPartial(PartialLogicalPlan, GroupId, JobId), - - /// New physical implementation for a goal, awaiting recursive optimization. - NewPhysicalPartial(PartialPhysicalPlan, GoalId, JobId), - - /// Fully optimized physical expression with complete costing. - NewCostedPhysical(PhysicalExpressionId, Cost, JobId), - - /// Create a new group with the provided logical properties. - CreateGroup(LogicalExpressionId, LogicalProperties, JobId), - - /// Subscribe to logical expressions in a specific group. - SubscribeGroup( - GroupId, - Continuation>, - JobId, - ), - - /// Subscribe to costed physical expressions for a goal. - SubscribeGoal( - Goal, - Continuation>, - JobId, - ), - - /// Retrieve logical properties for a specific group. - #[allow(unused)] - RetrieveProperties(GroupId, Sender), -} - -/// A message that is waiting for dependencies before it can be processed. -/// -/// Tracks the set of job IDs that must exist before the message can be handled. -struct PendingMessage { - /// The message stashed for later processing. - message: OptimizerMessage, - - /// Set of job IDs whose groups must be created before this message can be processed. - pending_dependencies: HashSet, -} - -/// The central access point to the optimizer. -/// -/// Provides the interface to submit logical plans for optimization and receive -/// optimized physical plans in return. -pub struct Optimizer { - // Core components. - memo: M, - rule_book: RuleBook, - hir_context: Context, - catalog: Arc, - - // Message handling. - pending_messages: Vec, - message_tx: Sender, - message_rx: Receiver, - optimize_rx: Receiver, - - // Task management. - tasks: HashMap, - next_task_id: TaskId, - - // Job management. - pending_jobs: HashMap, - job_schedule_queue: VecDeque, - running_jobs: HashMap, - next_job_id: JobId, - max_concurrent_jobs: usize, - - // Task indexing. - group_exploration_task_index: HashMap, - goal_optimization_task_index: HashMap, - cost_expression_task_index: HashMap, - - // Subscriptions. - group_subscribers: HashMap>, - goal_subscribers: HashMap>, -} - -impl Optimizer { - /// Create a new optimizer instance with the given memo and HIR context. - /// - /// Use `launch` to create and start the optimizer. - fn new( - memo: M, - hir: HIR, - catalog: Arc, - message_tx: Sender, - message_rx: Receiver, - optimize_rx: Receiver, - ) -> Self { - Self { - // Core components. - memo, - rule_book: RuleBook::default(), - hir_context: hir.context, - catalog, - - // Message handling. - pending_messages: Vec::new(), - message_tx, - message_rx, - optimize_rx, - - // Task management. - tasks: HashMap::new(), - next_task_id: TaskId(0), - - // Job management. - pending_jobs: HashMap::new(), - job_schedule_queue: VecDeque::new(), - running_jobs: HashMap::new(), - next_job_id: JobId(0), - max_concurrent_jobs: DEFAULT_MAX_CONCURRENT_JOBS, - - // Task indexing. - group_exploration_task_index: HashMap::new(), - goal_optimization_task_index: HashMap::new(), - cost_expression_task_index: HashMap::new(), - - // Subscriptions. - group_subscribers: HashMap::new(), - goal_subscribers: HashMap::new(), - } - } - - /// Launch a new optimizer and return a sender for client communication. - pub fn launch(memo: M, hir: HIR, catalog: Arc) -> Sender { - let (message_tx, message_rx) = mpsc::channel(0); - let (optimize_tx, optimize_rx) = mpsc::channel(0); - - // Start the background processing loop. - let optimizer = Self::new( - memo, - hir, - catalog, - message_tx.clone(), - message_rx, - optimize_rx, - ); - tokio::spawn(async move { - // TODO(Alexis): If an error occurs we could restart or reboot the memo. - // Rather than failing (e.g. memo could be distributed). - optimizer.run().await.expect("Optimizer failure"); - }); - - optimize_tx - } - - /// Run the optimizer's main processing loop. - async fn run(mut self) -> Result<(), Error> { - loop { - tokio::select! { - Some(request) = self.optimize_rx.next() => { - let OptimizeRequest { plan, response_tx } = request.clone(); - let task_id = self.launch_optimize_plan_task(plan, response_tx).await; - let mut message_tx = self.message_tx.clone(); - - // Forward the optimization request to the message processing loop - // in a new coroutine to avoid a deadlock. - tokio::spawn( - async move { - message_tx.send(OptimizeRequestWrapper(request, task_id)) - .await - .expect("Failed to forward optimize request"); - } - ); - }, - Some(message) = self.message_rx.next() => { - // Process the next message in the channel. - match message { - OptimizeRequestWrapper(request, task_id_opt) => { - self.process_optimize_request(request.plan, request.response_tx, task_id_opt).await?; - } - NewLogicalPartial(plan, group_id, job_id) => { - if self.get_related_task(job_id).is_some() { - self.process_new_logical_partial(plan, group_id, job_id).await?; - self.complete_job(job_id).await?; - } - } - NewPhysicalPartial(plan, goal_id, job_id) => { - if self.get_related_task(job_id).is_some() { - self.process_new_physical_partial(plan, goal_id, job_id).await?; - self.complete_job(job_id).await?; - } - } - NewCostedPhysical(expression_id, cost, job_id) => { - if self.get_related_task(job_id).is_some() { - self.process_new_costed_physical(expression_id, cost).await?; - self.complete_job(job_id).await?; - } - } - CreateGroup( expression_id, properties, job_id) => { - if self.get_related_task(job_id).is_some() { - self.process_create_group(expression_id, &properties, job_id).await?; - self.complete_job(job_id).await?; - } - } - SubscribeGroup(group_id, continuation, job_id) => { - if self.get_related_task(job_id).is_some() { - self.process_group_subscription(group_id, continuation, job_id).await?; - self.complete_job(job_id).await?; - } - } - SubscribeGoal(goal, continuation, job_id) => { - if self.get_related_task(job_id).is_some() { - self.process_goal_subscription(&goal, continuation, job_id).await?; - self.complete_job(job_id).await?; - } - } - RetrieveProperties(group_id, sender) => { - self.process_retrieve_properties(group_id, sender).await?; - } - }; - - // Launch pending jobs according to a policy (currently FIFO). - self.launch_pending_jobs().await?; - }, - else => break Ok(()), - } - } - } -} diff --git a/optd/src/core/optimizer/subscriptions.rs b/optd/src/core/optimizer/subscriptions.rs deleted file mode 100644 index f0ec411c..00000000 --- a/optd/src/core/optimizer/subscriptions.rs +++ /dev/null @@ -1,205 +0,0 @@ -use super::{ - Optimizer, - jobs::JobKind, - tasks::{TaskId, TaskKind}, -}; -use crate::core::{ - cir::{Cost, GoalId, GroupId, LogicalExpressionId, PhysicalExpressionId}, - error::Error, - memo::Memoize, -}; -use JobKind::*; -use TaskKind::*; -use futures::SinkExt; - -impl Optimizer { - /// Subscribe a task to logical expressions in a specific group. - /// - /// This method adds a task as a subscriber to a group, ensures there's an exploration - /// task for the group, and returns all existing expressions for bootstrapping. - /// - /// # Parameters - /// * `group_id` - The ID of the group to subscribe to. - /// * `subscriber_task_id` - The ID of the task that wants to receive notifications. - /// - /// # Returns - /// A vector of existing logical expressions in the group that the subscriber - /// can use for initialization. - pub(super) async fn subscribe_task_to_group( - &mut self, - group_id: GroupId, - subscriber_task_id: TaskId, - ) -> Result, Error> { - let subscribers = self.group_subscribers.entry(group_id).or_default(); - if !subscribers.contains(&subscriber_task_id) { - subscribers.push(subscriber_task_id); - } - - self.ensure_group_exploration_task(group_id, subscriber_task_id) - .await?; - - self.memo.get_all_logical_exprs(group_id).await - } - - /// Subscribe a task to optimized expressions for a specific goal. - /// - /// This method adds a task as a subscriber to a goal, ensures there's an exploration - /// task for the goal, and returns the best existing optimized expression for bootstrapping. - /// - /// # Parameters - /// * `goal_id` - The ID of the goal to subscribe to. - /// * `subscriber_task_id` - The ID of the task that wants to receive notifications. - /// - /// # Returns - /// The best optimized expression for the goal if one exists, or None if no - /// optimized expression is available yet. - pub(super) async fn subscribe_task_to_goal( - &mut self, - goal_id: GoalId, - subscriber_task_id: TaskId, - ) -> Result, Error> { - let subscribers = self.goal_subscribers.entry(goal_id).or_default(); - if !subscribers.contains(&subscriber_task_id) { - subscribers.push(subscriber_task_id); - } - - self.ensure_goal_optimize_task(goal_id, subscriber_task_id) - .await?; - - self.memo.get_best_optimized_physical_expr(goal_id).await - } - - /// Schedules logical expression continuation jobs for group subscribers. - /// - /// # Parameters - /// * `group_id` - The ID of the group that has new expressions. - /// * `expression_ids` - A slice of logical expression IDs to continue with. - pub(super) fn schedule_logical_continuations( - &mut self, - group_id: GroupId, - expression_ids: &[LogicalExpressionId], - ) { - // Skip processing if there are no expressions or subscribers. - let Some(subscribers) = self.group_subscribers.get(&group_id) else { - return; - }; - - let all_continuation_jobs: Vec<_> = subscribers - .iter() - .filter_map(|&task_id| { - self.tasks.get(&task_id).and_then(|task| { - let continuations = match &task.kind { - TransformExpression(task) => task.continuations.get(&group_id), - ImplementExpression(task) => task.continuations.get(&group_id), - _ => None, - }; - continuations.map(|conts| (task_id, conts)) - }) - }) - .flat_map(|(task_id, conts)| { - expression_ids.iter().flat_map(move |&expr_id| { - conts - .iter() - .map(move |cont| (task_id, ContinueWithLogical(expr_id, cont.clone()))) - }) - }) - .collect(); - - for (task_id, job) in all_continuation_jobs { - self.schedule_job(task_id, job); - } - } - - /// Schedules optimized expression continuation jobs for goal subscribers. - /// - /// # Parameters - /// * `goal_id` - The ID of the goal that has new best expressions. - /// * `expression_id` - The ID of the optimized expression to continue with. - /// * `cost` - The cost of the optimized expression. - pub(super) fn schedule_optimized_continuations( - &mut self, - goal_id: GoalId, - expression_id: PhysicalExpressionId, - cost: Cost, - ) { - // Skip processing if there are no expressions or subscribers. - let Some(subscribers) = self.goal_subscribers.get(&goal_id) else { - return; - }; - - let all_continuation_jobs: Vec<_> = subscribers - .iter() - .filter_map(|&task_id| { - self.tasks.get(&task_id).and_then(|task| match &task.kind { - CostExpression(cost_task) => cost_task - .continuations - .get(&goal_id) - .map(|conts| (task_id, conts)), - _ => None, - }) - }) - .flat_map(|(task_id, conts)| { - conts.iter().map(move |cont| { - ( - task_id, - ContinueWithCostedPhysical(expression_id, cost, cont.clone()), - ) - }) - }) - .collect(); - - // Schedule all collected jobs in batch. - for (task_id, job) in all_continuation_jobs { - self.schedule_job(task_id, job); - } - } - - /// Egests and sends optimized plans to optimize plan task subscribers. - /// - /// This method converts the optimized expression to a physical plan and - /// sends it to any optimize plan tasks that are waiting for results. - /// - /// # Parameters - /// * `goal_id` - The ID of the goal that has a new best expression. - /// * `expression_id` - The ID of the optimized expression to egest as a physical plan. - pub(super) async fn egest_to_subscribers( - &mut self, - goal_id: GoalId, - expression_id: PhysicalExpressionId, - ) -> Result<(), Error> { - // Find all optimize plan tasks that are subscribed to this root goal. - let send_channels: Vec<_> = self - .goal_subscribers - .get(&goal_id) - .into_iter() - .flatten() - .filter_map(|task_id| { - self.tasks.get(task_id).and_then(|task| { - if let OptimizePlan(plan_task) = &task.kind { - Some(plan_task.response_tx.clone()) - } else { - None - } - }) - }) - .collect(); - - // If we have any optimize plan tasks, egest the plan and send it - // without blocking the optimizer. - if !send_channels.is_empty() { - let physical_plan = self.egest_best_plan(expression_id).await?.unwrap(); - - for mut response_tx in send_channels { - let plan_clone = physical_plan.clone(); - tokio::spawn(async move { - response_tx - .send(plan_clone) - .await - .expect("Failed to send plan - channel closed."); - }); - } - } - - Ok(()) - } -} diff --git a/optd/src/core/optimizer/tasks.rs b/optd/src/core/optimizer/tasks.rs deleted file mode 100644 index ec6af4e0..00000000 --- a/optd/src/core/optimizer/tasks.rs +++ /dev/null @@ -1,494 +0,0 @@ -#![allow(dead_code)] - -use super::{ - Optimizer, OptimizerMessage, - jobs::{JobId, JobKind}, -}; -use crate::core::{ - cir::{ - Goal, GoalId, GoalMemberId, GroupId, ImplementationRule, LogicalExpressionId, LogicalPlan, - PhysicalExpressionId, PhysicalPlan, PhysicalProperties, TransformationRule, - }, - error::Error, - memo::{Memoize, Status}, -}; -use crate::dsl::{ - analyzer::hir::Value, - engine::{Continuation, EngineResponse}, -}; -use JobKind::*; -use TaskKind::*; -use futures::channel::mpsc::Sender; -use std::collections::{HashMap, HashSet}; - -//============================================================================= -// Type definitions and core structures -//============================================================================= - -/// Unique identifier for tasks in the optimization system. -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] -pub(super) struct TaskId(pub i64); - -/// A task represents a higher-level objective in the optimization process. -/// -/// Tasks are composed of one or more jobs and may depend on other tasks. -/// They represent structured, potentially hierarchical components of the -/// optimization process. -pub(super) struct Task { - /// Tasks that created this task (parent tasks). - pub parents: HashSet, - - /// Tasks that were created by this task (children tasks). - pub children: HashSet, - - /// The specific kind of task. - pub kind: TaskKind, - - /// Set of job IDs that must complete before this task is (temporarily) finished. - pub uncompleted_jobs: HashSet, -} - -impl Task { - /// Creates a new task with the specified kind and empty parent, child list, and job set. - fn new(kind: TaskKind) -> Self { - Self { - parents: HashSet::new(), - children: HashSet::new(), - kind, - uncompleted_jobs: HashSet::new(), - } - } -} - -/// Enumeration of different types of tasks in the optimizer. -/// -/// Each variant represents a structured component of the optimization process -/// that may launch multiple jobs and coordinate their execution. -pub(super) enum TaskKind { - /// Top-level task to optimize a logical plan. - OptimizePlan(OptimizePlanTask), - - /// Task to optimize a specific goal. - OptimizeGoal(GoalId), - - /// Task to explore expressions in a logical group. - ExploreGroup(GroupId), - - /// Task to apply a specific implementation rule to a logical expression. - ImplementExpression(ImplementExpressionTask), - - /// Task to apply a specific transformation rule to a logical expression. - TransformExpression(TransformExpressionTask), - - /// Task to compute the cost of a physical expression. - CostExpression(CostExpressionTask), -} - -//============================================================================= -// Task variant structs -//============================================================================= - -/// Task data for optimizing a logical plan. -pub(super) struct OptimizePlanTask { - /// The logical plan to be optimized. - pub plan: LogicalPlan, - - /// Channel to send the optimized physical plan back to the caller. - pub response_tx: Sender, -} - -impl OptimizePlanTask { - pub fn new(plan: LogicalPlan, response_tx: Sender) -> Self { - Self { plan, response_tx } - } -} - -/// Task data for implementing a group with specific physical properties for a goal. -pub(super) struct ImplementGroupTask { - /// The group to implement. - pub group_id: GroupId, - - /// The physical properties to implement for. - pub properties: PhysicalProperties, - - /// The goal ID this implementation is for. - pub goal_id: GoalId, -} - -impl ImplementGroupTask { - pub fn new(group_id: GroupId, properties: PhysicalProperties, goal_id: GoalId) -> Self { - Self { - group_id, - properties, - goal_id, - } - } -} - -/// Task data for implementing a logical expression using a specific rule. -pub(super) struct ImplementExpressionTask { - /// The implementation rule to apply. - pub rule: ImplementationRule, - - /// Whether the task has started the implementation rule. - pub has_started: bool, - - /// The logical expression to implement. - pub expression_id: LogicalExpressionId, - - /// The goal ID for this implementation. - pub goal_id: GoalId, - - /// Continuations for each group that need to be notified when - /// new logical expressions are created. - pub continuations: HashMap>>>, -} - -impl ImplementExpressionTask { - pub fn new( - rule: ImplementationRule, - has_started: bool, - expression_id: LogicalExpressionId, - goal_id: GoalId, - ) -> Self { - Self { - rule, - has_started, - expression_id, - goal_id, - continuations: HashMap::new(), - } - } -} - -/// Task data for transforming a logical expression using a specific rule. -pub(super) struct TransformExpressionTask { - /// The transformation rule to apply. - pub rule: TransformationRule, - - /// Whether the task has started the transformation rule. - pub has_started: bool, - - /// The logical expression to transform. - pub expression_id: LogicalExpressionId, - - /// Continuations for each group that need to be notified when - /// new logical expressions are created. - pub continuations: HashMap>>>, -} - -impl TransformExpressionTask { - pub fn new( - rule: TransformationRule, - has_started: bool, - expression_id: LogicalExpressionId, - ) -> Self { - Self { - rule, - has_started, - expression_id, - continuations: HashMap::new(), - } - } -} - -/// Task data for costing a physical expression. -pub(super) struct CostExpressionTask { - /// The physical expression to cost. - pub expression_id: PhysicalExpressionId, - - /// Whether the task has started the cost estimation. - pub has_started: bool, - - /// Continuations for each goal that need to be notified when - /// optimized expressions are created. - pub continuations: HashMap>>>, -} - -impl CostExpressionTask { - pub fn new(expression_id: PhysicalExpressionId, has_started: bool) -> Self { - Self { - expression_id, - has_started, - continuations: HashMap::new(), - } - } -} - -//============================================================================= -// Optimizer task implementation -//============================================================================= - -impl Optimizer { - //------------------------------------------------------------------------- - // Public API - //------------------------------------------------------------------------- - - /// Launches a new task to optimize a logical plan into a physical plan. - /// - /// This method creates and registers a task that will optimize the provided logical - /// plan into a physical plan. The optimized plan will be sent back through the provided - /// response channel every time a better plan is found. - pub(super) async fn launch_optimize_plan_task( - &mut self, - plan: LogicalPlan, - response_tx: Sender, - ) -> TaskId { - let task = OptimizePlanTask::new(plan, response_tx); - self.register_new_task(OptimizePlan(task), None) - } - - /// Ensures a group exploration task exists and sets up a parent-child relationship. - /// - /// This is used when a task needs to explore all possible expressions in a group - /// as part of its work. If an exploration task already exists, we reuse it. - pub(super) async fn ensure_group_exploration_task( - &mut self, - group_id: GroupId, - parent_task_id: TaskId, - ) -> Result<(), Error> { - let task_id = match self.group_exploration_task_index.get(&group_id) { - Some(id) => *id, - None => { - self.launch_group_exploration_task(group_id, parent_task_id) - .await? - } - }; - - self.register_parent_child_relationship(parent_task_id, task_id); - Ok(()) - } - - /// Ensures a goal optimization task exists and sets up a parent-child relationship. - /// - /// This is used when a task needs to optimize a goal as part of its work. - /// If an optimization task already exists, we reuse it. - pub(super) async fn ensure_goal_optimize_task( - &mut self, - goal_id: GoalId, - parent_task_id: TaskId, - ) -> Result<(), Error> { - let task_id = match self.goal_optimization_task_index.get(&goal_id) { - Some(id) => *id, - None => { - self.launch_goal_optimize_task(goal_id, parent_task_id) - .await? - } - }; - - self.register_parent_child_relationship(parent_task_id, task_id); - Ok(()) - } - - /// Ensures a cost expression task exists and sets up a parent-child relationship. - /// - /// This is used when a task needs to cost a physical expression as part of its work. - /// If a costing task already exists, we reuse it. - pub(super) async fn ensure_cost_expression_task( - &mut self, - expression_id: PhysicalExpressionId, - parent_task_id: TaskId, - ) -> Result<(), Error> { - let task_id = match self.cost_expression_task_index.get(&expression_id) { - Some(id) => *id, - None => { - let is_dirty = self.memo.get_cost_status(expression_id).await? == Status::Dirty; - let task = CostExpressionTask::new(expression_id, is_dirty); - let task_id = self.register_new_task(CostExpression(task), Some(parent_task_id)); - self.cost_expression_task_index - .insert(expression_id, task_id); - - if is_dirty { - self.schedule_job(task_id, StartCostExpression(expression_id)); - } - - task_id - } - }; - - self.register_parent_child_relationship(parent_task_id, task_id); - Ok(()) - } - - //------------------------------------------------------------------------- - // Internal task launching methods - //------------------------------------------------------------------------- - - /// Launches a task to start applying a transformation rule to a logical expression. - /// - /// This task generates alternative logical expressions that are - /// semantically equivalent to the original. It maintains a set of continuations - /// that will be notified of the transformation results. - /// - /// Only schedules the starting job if the transformation is marked as dirty in the memo. - async fn launch_transform_expression_task( - &mut self, - rule: TransformationRule, - expression_id: LogicalExpressionId, - group_id: GroupId, - parent_task_id: TaskId, - ) -> Result { - let is_dirty = self - .memo - .get_transformation_status(expression_id, &rule) - .await? - == Status::Dirty; - - let task = TransformExpressionTask::new(rule.clone(), is_dirty, expression_id); - let task_id = self.register_new_task(TransformExpression(task), Some(parent_task_id)); - - if is_dirty { - self.schedule_job( - task_id, - StartTransformationRule(rule, expression_id, group_id), - ); - } - - Ok(task_id) - } - - /// Launches a task to start applying an implementation rule to a logical expression. - /// - /// This task generates physical implementations from a logical expression - /// using a specified implementation strategy. It maintains a set of continuations - /// that will be notified of the implementation results. - /// - /// Only schedules the starting job if the implementation is marked as dirty in the memo. - async fn launch_implement_expression_task( - &mut self, - rule: ImplementationRule, - expression_id: LogicalExpressionId, - goal_id: GoalId, - parent_task_id: TaskId, - ) -> Result { - let is_dirty = self - .memo - .get_implementation_status(expression_id, goal_id, &rule) - .await? - == Status::Dirty; - - let task = ImplementExpressionTask::new(rule.clone(), is_dirty, expression_id, goal_id); - let task_id = self.register_new_task(ImplementExpression(task), Some(parent_task_id)); - - if is_dirty { - self.schedule_job( - task_id, - StartImplementationRule(rule, expression_id, goal_id), - ); - } - - Ok(task_id) - } - - /// Launches a new task to explore all possible transformations for a logical group. - /// - /// This schedules jobs to apply all available transformation rules to all - /// logical expressions in the group. - async fn launch_group_exploration_task( - &mut self, - group_id: GroupId, - parent_task_id: TaskId, - ) -> Result { - let task_id = self.register_new_task(ExploreGroup(group_id), Some(parent_task_id)); - self.group_exploration_task_index.insert(group_id, task_id); - - // Launch the transformation task for all expression-rule combinations. - let transformations = self.rule_book.get_transformations().to_vec(); - let expressions = self.memo.get_all_logical_exprs(group_id).await?; - - for expression_id in expressions { - for rule in &transformations { - self.launch_transform_expression_task( - rule.clone(), - expression_id, - group_id, - task_id, - ) - .await?; - } - } - - Ok(task_id) - } - - /// Launches a new task to optimize a goal. - /// - /// This method creates and manages the tasks needed to optimize a goal by - /// ensuring group exploration, launching implementation tasks, and processing goal members. - async fn launch_goal_optimize_task( - &mut self, - goal_id: GoalId, - parent_task_id: TaskId, - ) -> Result { - let task_id = self.register_new_task(OptimizeGoal(goal_id), Some(parent_task_id)); - self.goal_optimization_task_index.insert(goal_id, task_id); - - // Launch implementation tasks for all expression-rule combinations. - let Goal(group_id, _) = self.memo.materialize_goal(goal_id).await?; - self.ensure_group_exploration_task(group_id, task_id) - .await?; - - // Launch implementation tasks for all logical expressions in the group. - let logical_expressions = self.memo.get_all_logical_exprs(group_id).await?; - let implementations = self.rule_book.get_implementations().to_vec(); - - for expr_id in logical_expressions { - for rule in &implementations { - self.launch_implement_expression_task(rule.clone(), expr_id, goal_id, task_id) - .await?; - } - } - - // Process all goal members: physical expressions and subgoals. - let goal_members = self.memo.get_all_goal_members(goal_id).await?; - for member in goal_members { - match member { - GoalMemberId::PhysicalExpressionId(expr_id) => { - self.ensure_cost_expression_task(expr_id, task_id).await?; - } - GoalMemberId::GoalId(referenced_goal_id) => { - self.ensure_goal_optimize_task(referenced_goal_id, task_id) - .await?; - } - } - } - - Ok(task_id) - } - - //------------------------------------------------------------------------- - // Helper methods for task management - //------------------------------------------------------------------------- - - /// Helper method to register a new task of a specified kind. - /// - /// Assigns a unique task ID, adds the task to the task registry, - /// and sets up the parent relationship if provided. - fn register_new_task(&mut self, kind: TaskKind, parent: Option) -> TaskId { - // Generate a unique task ID. - let task_id = self.next_task_id; - self.next_task_id.0 += 1; - - // Create and register the task. - let mut task = Task::new(kind); - if let Some(parent_id) = parent { - task.parents.insert(parent_id); - } - self.tasks.insert(task_id, task); - - task_id - } - - /// Sets up the bidirectional parent-child relationship between tasks. - /// - /// Updates both the parent's children list and the child's parents set. - fn register_parent_child_relationship(&mut self, parent_id: TaskId, child_id: TaskId) { - // Add child to parent's children list if not already there. - let parent = self.tasks.get_mut(&parent_id).unwrap(); - parent.children.insert(child_id); - - // Add parent to child's parents set. - let child = self.tasks.get_mut(&child_id).unwrap(); - child.parents.insert(parent_id); - } -}