diff --git a/core/src/compaction/auto.rs b/core/src/compaction/auto.rs index 54a8cc7..9457b79 100644 --- a/core/src/compaction/auto.rs +++ b/core/src/compaction/auto.rs @@ -20,6 +20,7 @@ //! [`AutoCompaction`] for end-to-end automatic compaction workflows. use std::borrow::Cow; +use std::num::NonZeroUsize; use std::sync::Arc; use iceberg::scan::FileScanTask; @@ -35,6 +36,65 @@ use crate::config::AutoCompactionConfig; use crate::executor::ExecutorType; use crate::file_selection::{FileSelector, PlanStrategy, SnapshotStats}; +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum AutoSelectedStrategy { + FilesWithDeletes, + SmallFiles, +} + +impl AutoSelectedStrategy { + fn from_planning_config(config: &crate::config::CompactionPlanningConfig) -> Option { + match config { + crate::config::CompactionPlanningConfig::FilesWithDeletes(_) => { + Some(AutoSelectedStrategy::FilesWithDeletes) + } + crate::config::CompactionPlanningConfig::SmallFiles(_) => { + Some(AutoSelectedStrategy::SmallFiles) + } + crate::config::CompactionPlanningConfig::Full(_) => None, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum AutoPlanReason { + Recommended, + NoSnapshot, + NoCandidate, + NoPlansProduced, + BudgetCapped, +} + +#[derive(Debug, Clone)] +pub struct AutoPlanReport { + pub selected_strategy: Option, + pub plans: Vec, + /// Total input bytes for selected plans: data + delete files. + pub planned_input_bytes: u64, + /// Total input file count for selected plans: data + delete files. + pub planned_input_files: usize, + /// `planned_data_bytes / total_data_bytes` (0 if total is 0). + pub rewrite_ratio: f64, + pub reason: AutoPlanReason, +} + +impl AutoPlanReport { + fn empty(reason: AutoPlanReason) -> Self { + Self { + selected_strategy: None, + plans: vec![], + planned_input_bytes: 0, + planned_input_files: 0, + rewrite_ratio: 0.0, + reason, + } + } +} + +fn compute_total_data_bytes(tasks: &[FileScanTask]) -> u64 { + tasks.iter().map(|t| t.length).sum() +} + /// Planner that performs analysis and plan generation in a single scan. /// /// Combines snapshot analysis (stats computation) and file grouping into one @@ -50,51 +110,210 @@ impl AutoCompactionPlanner { /// Plans compaction for a table branch. /// - /// Returns empty vector if no files need compaction. + /// Returns an empty vector when the planner does not return executable plans. + /// + /// Use [`plan_compaction_report_with_branch`](Self::plan_compaction_report_with_branch) + /// when callers need to distinguish between `NoSnapshot`, `NoCandidate`, + /// `NoPlansProduced`, and budget-capped empty results. pub async fn plan_compaction_with_branch( &self, table: &Table, to_branch: &str, ) -> Result> { + let report = self + .plan_compaction_report_with_branch(table, to_branch) + .await?; + + Ok(report.plans) + } + + /// Plans compaction for a table branch and returns a report including cost and reason. + pub async fn plan_compaction_report_with_branch( + &self, + table: &Table, + to_branch: &str, + ) -> Result { let Some(snapshot) = table.metadata().snapshot_for_ref(to_branch) else { - return Ok(vec![]); + return Ok(AutoPlanReport::empty(AutoPlanReason::NoSnapshot)); }; let snapshot_id = snapshot.snapshot_id(); - let tasks = FileSelector::scan_data_files(table, snapshot_id).await?; - let stats = Self::compute_stats(&tasks, self.config.small_file_threshold_bytes); + let mut tasks = Some(FileSelector::scan_data_files(table, snapshot_id).await?); + let total_data_bytes = compute_total_data_bytes(tasks.as_ref().unwrap()); + let stats = Self::compute_stats( + tasks.as_ref().unwrap(), + self.config.small_file_threshold_bytes, + self.config.min_delete_file_count_threshold, + ); + + let delete_candidate = self.config.files_with_deletes_candidate(&stats); + let small_candidate = self.config.small_files_candidate(&stats); + if delete_candidate.is_none() && small_candidate.is_none() { + return Ok(AutoPlanReport::empty(AutoPlanReason::NoCandidate)); + } - let Some(planning_config) = self.config.resolve(&stats) else { - return Ok(vec![]); + let delete_report = if let Some(planning_config) = delete_candidate { + let delete_tasks = if small_candidate.is_some() { + tasks.as_ref().unwrap().clone() + } else { + tasks.take().unwrap() + }; + let report = Self::build_report( + delete_tasks, + planning_config, + to_branch, + snapshot_id, + total_data_bytes, + AutoPlanReason::Recommended, + )?; + if report.plans.is_empty() { + Some(report) + } else { + return Ok(Self::cap_report_plans( + report, + total_data_bytes, + self.config.max_auto_plans_per_run, + )); + } + } else { + None }; + let small_report = if let Some(planning_config) = small_candidate { + Some(Self::build_report( + tasks.take().unwrap(), + planning_config, + to_branch, + snapshot_id, + total_data_bytes, + AutoPlanReason::Recommended, + )?) + } else { + None + }; + + Ok(Self::select_report( + delete_report, + small_report, + total_data_bytes, + self.config.max_auto_plans_per_run, + )) + } + + fn build_report( + tasks: Vec, + planning_config: crate::config::CompactionPlanningConfig, + to_branch: &str, + snapshot_id: i64, + total_data_bytes: u64, + reason: AutoPlanReason, + ) -> Result { + let selected_strategy = AutoSelectedStrategy::from_planning_config(&planning_config); let strategy = PlanStrategy::from(&planning_config); let file_groups = FileSelector::group_tasks_with_strategy(tasks, strategy, &planning_config)?; - let plans = file_groups + let plans: Vec = file_groups .into_iter() .map(|fg| CompactionPlan::new(fg, to_branch.to_owned(), snapshot_id)) .filter(|p| p.has_files()) .collect(); + Ok(Self::report_from_plans( + selected_strategy, + plans, + total_data_bytes, + reason, + )) + } + + fn cap_report_plans( + report: AutoPlanReport, + total_data_bytes: u64, + max_plans: NonZeroUsize, + ) -> AutoPlanReport { + if report.plans.is_empty() { + return AutoPlanReport { + reason: AutoPlanReason::NoPlansProduced, + ..report + }; + } + + if report.plans.len() <= max_plans.get() { + return report; + } - Ok(plans) + let plans: Vec = report.plans.into_iter().take(max_plans.get()).collect(); + Self::report_from_plans( + report.selected_strategy, + plans, + total_data_bytes, + AutoPlanReason::BudgetCapped, + ) + } + + fn select_report( + delete_report: Option, + small_report: Option, + total_data_bytes: u64, + max_plans: NonZeroUsize, + ) -> AutoPlanReport { + if let Some(report) = delete_report.filter(|report| !report.plans.is_empty()) { + return Self::cap_report_plans(report, total_data_bytes, max_plans); + } + + if let Some(report) = small_report.filter(|report| !report.plans.is_empty()) { + return Self::cap_report_plans(report, total_data_bytes, max_plans); + } + + AutoPlanReport::empty(AutoPlanReason::NoPlansProduced) + } + + fn report_from_plans( + selected_strategy: Option, + plans: Vec, + total_data_bytes: u64, + reason: AutoPlanReason, + ) -> AutoPlanReport { + let planned_data_bytes = plans.iter().map(|p| p.file_group.total_size).sum::(); + let planned_input_bytes = plans.iter().map(CompactionPlan::total_bytes).sum(); + let planned_input_files = plans.iter().map(CompactionPlan::file_count).sum(); + let rewrite_ratio = if total_data_bytes == 0 { + 0.0 + } else { + planned_data_bytes as f64 / total_data_bytes as f64 + }; + + AutoPlanReport { + selected_strategy, + plans, + planned_input_bytes, + planned_input_files, + rewrite_ratio, + reason, + } } /// Computes statistics from pre-scanned tasks without additional IO. - fn compute_stats(tasks: &[FileScanTask], small_file_threshold_bytes: u64) -> SnapshotStats { + fn compute_stats( + tasks: &[FileScanTask], + small_file_threshold_bytes: u64, + min_delete_file_count_threshold: usize, + ) -> SnapshotStats { let mut stats = SnapshotStats::default(); for task in tasks { stats.total_data_files += 1; - if task.length < small_file_threshold_bytes { + let is_small = task.length < small_file_threshold_bytes; + if is_small { stats.small_files_count += 1; } - if !task.deletes.is_empty() { - stats.files_with_deletes_count += 1; + let is_delete_heavy = min_delete_file_count_threshold > 0 + && task.deletes.len() >= min_delete_file_count_threshold; + if is_delete_heavy { + stats.delete_heavy_files_count += 1; } } @@ -187,8 +406,8 @@ impl AutoCompactionBuilder { /// Automatic compaction with runtime strategy selection. /// -/// Selects the appropriate compaction strategy (small files, files with deletes, -/// or full) based on snapshot statistics and executes the compaction workflow. +/// Selects between localized `FilesWithDeletes` and `SmallFiles` plans based on +/// snapshot statistics and executes the compaction workflow. pub struct AutoCompaction { inner: Compaction, auto_config: AutoCompactionConfig, @@ -197,7 +416,8 @@ pub struct AutoCompaction { impl AutoCompaction { /// Runs automatic compaction. /// - /// Returns `None` if no strategy matches or no files need compaction. + /// Returns `None` when `Auto` does not produce executable plans or when no + /// rewrite results are produced. pub async fn compact(&self) -> Result> { let overall_start_time = std::time::Instant::now(); @@ -247,3 +467,184 @@ impl AutoCompaction { Ok(Some(merged_result)) } } + +#[cfg(test)] +mod tests { + use iceberg::spec::{DataContentType, DataFileFormat, Schema}; + + use super::*; + + #[test] + fn test_compute_total_data_bytes() { + fn make_task(length: u64, path: &str) -> FileScanTask { + FileScanTask { + start: 0, + length, + record_count: Some(1), + data_file_path: path.to_owned(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: std::sync::Arc::new(Schema::builder().build().unwrap()), + project_field_ids: vec![], + predicate: None, + deletes: vec![], + sequence_number: 1, + equality_ids: None, + file_size_in_bytes: length, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: true, + } + } + + let tasks = vec![make_task(10, "a.parquet"), make_task(20, "b.parquet")]; + assert_eq!(compute_total_data_bytes(&tasks), 30); + } + + fn make_task(length: u64, path: &str) -> FileScanTask { + FileScanTask { + start: 0, + length, + record_count: Some(1), + data_file_path: path.to_owned(), + data_file_content: DataContentType::Data, + data_file_format: DataFileFormat::Parquet, + schema: std::sync::Arc::new(Schema::builder().build().unwrap()), + project_field_ids: vec![], + predicate: None, + deletes: vec![], + sequence_number: 1, + equality_ids: None, + file_size_in_bytes: length, + partition: None, + partition_spec: None, + name_mapping: None, + case_sensitive: true, + } + } + + fn make_plan(length: u64, path: &str) -> CompactionPlan { + CompactionPlan::new( + crate::file_selection::FileGroup::new(vec![make_task(length, path)]), + "main", + 1, + ) + } + + fn make_report( + selected_strategy: AutoSelectedStrategy, + lengths: &[u64], + total_data_bytes: u64, + ) -> AutoPlanReport { + AutoCompactionPlanner::report_from_plans( + Some(selected_strategy), + lengths + .iter() + .enumerate() + .map(|(idx, length)| make_plan(*length, &format!("{idx}.parquet"))) + .collect(), + total_data_bytes, + AutoPlanReason::Recommended, + ) + } + + #[test] + fn test_cap_report_plans_limits_plan_count() { + let report = AutoPlanReport { + selected_strategy: Some(AutoSelectedStrategy::FilesWithDeletes), + plans: vec![ + make_plan(10, "a.parquet"), + make_plan(20, "b.parquet"), + make_plan(30, "c.parquet"), + ], + planned_input_bytes: 60, + planned_input_files: 3, + rewrite_ratio: 0.6, + reason: AutoPlanReason::Recommended, + }; + + let capped = + AutoCompactionPlanner::cap_report_plans(report, 100, NonZeroUsize::new(2).unwrap()); + assert_eq!(capped.reason, AutoPlanReason::BudgetCapped); + assert_eq!(capped.plans.len(), 2); + assert_eq!(capped.planned_input_bytes, 30); + assert_eq!(capped.planned_input_files, 2); + assert_eq!(capped.rewrite_ratio, 0.3); + } + + #[test] + fn test_cap_report_plans_keeps_reason_when_within_budget() { + let report = AutoPlanReport { + selected_strategy: Some(AutoSelectedStrategy::FilesWithDeletes), + plans: vec![make_plan(10, "a.parquet")], + planned_input_bytes: 10, + planned_input_files: 1, + rewrite_ratio: 0.1, + reason: AutoPlanReason::Recommended, + }; + + let capped = + AutoCompactionPlanner::cap_report_plans(report, 100, NonZeroUsize::new(1).unwrap()); + assert_eq!(capped.reason, AutoPlanReason::Recommended); + assert_eq!(capped.plans.len(), 1); + assert_eq!(capped.planned_input_bytes, 10); + assert_eq!(capped.planned_input_files, 1); + } + + #[test] + fn test_select_report_prefers_delete_plan() { + let delete_report = Some(make_report( + AutoSelectedStrategy::FilesWithDeletes, + &[10], + 100, + )); + let small_report = Some(make_report(AutoSelectedStrategy::SmallFiles, &[20], 100)); + + let selected = AutoCompactionPlanner::select_report( + delete_report, + small_report, + 100, + NonZeroUsize::new(10).unwrap(), + ); + assert_eq!( + selected.selected_strategy, + Some(AutoSelectedStrategy::FilesWithDeletes) + ); + assert_eq!(selected.plans.len(), 1); + assert_eq!(selected.reason, AutoPlanReason::Recommended); + } + + #[test] + fn test_select_report_falls_back_to_small_plan() { + let delete_report = Some(AutoPlanReport::empty(AutoPlanReason::NoPlansProduced)); + let small_report = Some(make_report(AutoSelectedStrategy::SmallFiles, &[20], 100)); + + let selected = AutoCompactionPlanner::select_report( + delete_report, + small_report, + 100, + NonZeroUsize::new(10).unwrap(), + ); + assert_eq!( + selected.selected_strategy, + Some(AutoSelectedStrategy::SmallFiles) + ); + assert_eq!(selected.plans.len(), 1); + assert_eq!(selected.reason, AutoPlanReason::Recommended); + } + + #[test] + fn test_select_report_returns_empty_when_all_candidates_are_empty() { + let selected = AutoCompactionPlanner::select_report( + Some(AutoPlanReport::empty(AutoPlanReason::NoPlansProduced)), + Some(AutoPlanReport::empty(AutoPlanReason::NoPlansProduced)), + 100, + NonZeroUsize::new(10).unwrap(), + ); + + assert!(selected.plans.is_empty()); + assert_eq!(selected.selected_strategy, None); + assert_eq!(selected.reason, AutoPlanReason::NoPlansProduced); + } +} diff --git a/core/src/compaction/mod.rs b/core/src/compaction/mod.rs index b8369c0..09f0fcf 100644 --- a/core/src/compaction/mod.rs +++ b/core/src/compaction/mod.rs @@ -41,7 +41,10 @@ use crate::{CompactionConfig, CompactionError, CompactionExecutor, Result}; pub mod auto; mod validator; -pub use auto::{AutoCompaction, AutoCompactionBuilder, AutoCompactionPlanner}; +pub use auto::{ + AutoCompaction, AutoCompactionBuilder, AutoCompactionPlanner, AutoPlanReason, AutoPlanReport, + AutoSelectedStrategy, +}; /// Validates that all rewrite results target the same snapshot and branch. /// diff --git a/core/src/config/mod.rs b/core/src/config/mod.rs index 3a34912..bbce90c 100644 --- a/core/src/config/mod.rs +++ b/core/src/config/mod.rs @@ -16,6 +16,8 @@ //! Compaction configuration types and constants. +use std::num::NonZeroUsize; + use derive_builder::Builder; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; @@ -41,6 +43,7 @@ pub const DEFAULT_ENABLE_PREFETCH: bool = false; // default setting for prefetch // Auto compaction defaults pub const DEFAULT_MIN_SMALL_FILES_COUNT: usize = 5; pub const DEFAULT_MIN_FILES_WITH_DELETES_COUNT: usize = 1; +pub const DEFAULT_MAX_AUTO_PLANS_PER_RUN: NonZeroUsize = NonZeroUsize::MAX; // Strategy configuration defaults pub const DEFAULT_TARGET_GROUP_SIZE: u64 = 100 * 1024 * 1024 * 1024; // 100GB - BinPack target size @@ -372,7 +375,7 @@ pub struct CompactionExecutionConfig { /// (`plan_compaction()` → `rewrite_plan()` → `commit_rewrite_results()`) manages /// concurrency externally. /// - /// Theoretical max parallelism = `max_parallelism` × `max_concurrent_compaction_plans`. + /// Theoretical max read parallelism = `max_input_parallelism` × `max_concurrent_compaction_plans`. /// Actual parallelism is typically lower due to per-plan heuristics. #[builder(default = "DEFAULT_MAX_CONCURRENT_COMPACTION_PLANS")] pub max_concurrent_compaction_plans: usize, @@ -436,27 +439,20 @@ impl Default for CompactionConfig { pub struct AutoThresholds { /// Minimum small file count to trigger `SmallFiles` strategy. pub min_small_files_count: usize, - /// Minimum delete file count to trigger `FilesWithDeletes` strategy. - pub min_files_with_deletes_count: usize, - /// Minimum impact ratio (fraction of total files). None = disabled. - pub min_impact_ratio: Option, + /// Minimum delete-heavy data file count to trigger `FilesWithDeletes` strategy. + pub min_delete_heavy_files_count: usize, } impl Default for AutoThresholds { fn default() -> Self { Self { min_small_files_count: DEFAULT_MIN_SMALL_FILES_COUNT, - min_files_with_deletes_count: DEFAULT_MIN_FILES_WITH_DELETES_COUNT, - min_impact_ratio: None, + min_delete_heavy_files_count: DEFAULT_MIN_FILES_WITH_DELETES_COUNT, } } } -// TODO: Consider supporting custom strategy order in the future. - /// Automatic strategy selection based on snapshot statistics. -/// -/// Priority: `FilesWithDeletes` → `SmallFiles` → `Full` (if enabled). #[derive(Builder, Debug, Clone)] #[builder(setter(into, strip_option))] pub struct AutoCompactionConfig { @@ -464,10 +460,6 @@ pub struct AutoCompactionConfig { #[builder(default)] pub thresholds: AutoThresholds, - /// Fallback to Full when no specialized strategy matches. - #[builder(default = "true")] - pub enable_full_fallback: bool, - /// Common planning parameters applied to all selected strategies #[builder(default = "DEFAULT_TARGET_FILE_SIZE")] pub target_file_size_bytes: u64, @@ -501,72 +493,76 @@ pub struct AutoCompactionConfig { #[builder(default = "DEFAULT_MIN_DELETE_FILE_COUNT_THRESHOLD")] pub min_delete_file_count_threshold: usize, + /// Maximum number of compaction plans to execute per auto-compaction run. + /// Defaults to unlimited. + #[builder(default = "DEFAULT_MAX_AUTO_PLANS_PER_RUN")] + pub max_auto_plans_per_run: NonZeroUsize, + #[builder(default)] pub execution: CompactionExecutionConfig, } impl AutoCompactionConfig { - /// Selects strategy based on snapshot statistics. - pub fn resolve(&self, stats: &SnapshotStats) -> Option { + pub(crate) fn files_with_deletes_candidate( + &self, + stats: &SnapshotStats, + ) -> Option { if stats.total_data_files <= 1 { return None; } - let total = stats.total_data_files as f64; - - if stats.files_with_deletes_count >= self.thresholds.min_files_with_deletes_count - && self - .thresholds - .min_impact_ratio - .is_none_or(|min| stats.files_with_deletes_count as f64 / total >= min) + if self.min_delete_file_count_threshold == 0 + || self.thresholds.min_delete_heavy_files_count == 0 { - return Some(CompactionPlanningConfig::FilesWithDeletes( + return None; + } + + if stats.delete_heavy_files_count >= self.thresholds.min_delete_heavy_files_count { + Some(CompactionPlanningConfig::FilesWithDeletes( FilesWithDeletesConfig { target_file_size_bytes: self.target_file_size_bytes, min_size_per_partition: self.min_size_per_partition, max_file_count_per_partition: self.max_file_count_per_partition, + max_input_parallelism: self.max_input_parallelism, + max_output_parallelism: self.max_output_parallelism, enable_heuristic_output_parallelism: self.enable_heuristic_output_parallelism, grouping_strategy: self.grouping_strategy.clone(), min_delete_file_count_threshold: self.min_delete_file_count_threshold, group_filters: self.group_filters.clone(), - max_input_parallelism: self.max_input_parallelism, - max_output_parallelism: self.max_output_parallelism, }, - )); + )) + } else { + None } + } - if stats.small_files_count >= self.thresholds.min_small_files_count - && self - .thresholds - .min_impact_ratio - .is_none_or(|min| stats.small_files_count as f64 / total >= min) - { - return Some(CompactionPlanningConfig::SmallFiles(SmallFilesConfig { - target_file_size_bytes: self.target_file_size_bytes, - min_size_per_partition: self.min_size_per_partition, - max_file_count_per_partition: self.max_file_count_per_partition, - max_input_parallelism: self.max_input_parallelism, - max_output_parallelism: self.max_output_parallelism, - enable_heuristic_output_parallelism: self.enable_heuristic_output_parallelism, - small_file_threshold_bytes: self.small_file_threshold_bytes, - grouping_strategy: self.grouping_strategy.clone(), - group_filters: self.group_filters.clone(), - })); + pub(crate) fn small_files_candidate( + &self, + stats: &SnapshotStats, + ) -> Option { + if stats.total_data_files <= 1 { + return None; + } + + if self.thresholds.min_small_files_count == 0 { + return None; } - if self.enable_full_fallback { - return Some(CompactionPlanningConfig::Full(FullCompactionConfig { + if stats.small_files_count >= self.thresholds.min_small_files_count { + Some(CompactionPlanningConfig::SmallFiles(SmallFilesConfig { target_file_size_bytes: self.target_file_size_bytes, min_size_per_partition: self.min_size_per_partition, max_file_count_per_partition: self.max_file_count_per_partition, max_input_parallelism: self.max_input_parallelism, max_output_parallelism: self.max_output_parallelism, enable_heuristic_output_parallelism: self.enable_heuristic_output_parallelism, + small_file_threshold_bytes: self.small_file_threshold_bytes, grouping_strategy: self.grouping_strategy.clone(), - })); + group_filters: self.group_filters.clone(), + })) + } else { + None } - - None } } @@ -586,136 +582,118 @@ mod tests { fn create_test_stats( total_data_files: usize, small_files: usize, - files_with_deletes: usize, + delete_heavy_files: usize, ) -> SnapshotStats { SnapshotStats { total_data_files, small_files_count: small_files, - files_with_deletes_count: files_with_deletes, + delete_heavy_files_count: delete_heavy_files, } } #[test] - fn test_resolve_strategy_priority() { + fn test_files_with_deletes_candidate_threshold() { let config = AutoCompactionConfigBuilder::default() .thresholds(AutoThresholds { - min_files_with_deletes_count: 3, + min_delete_heavy_files_count: 3, min_small_files_count: 5, - min_impact_ratio: None, }) .build() .unwrap(); - // Priority 1: FilesWithDeletes wins when both thresholds met + // Threshold met -> delete candidate is available. let stats = create_test_stats(10, 6, 4); assert!(matches!( - config.resolve(&stats).unwrap(), + config.files_with_deletes_candidate(&stats).unwrap(), CompactionPlanningConfig::FilesWithDeletes(_) )); - // Priority 2: SmallFiles when only it meets threshold + // Below delete threshold -> no delete candidate let stats = create_test_stats(10, 6, 2); - assert!(matches!( - config.resolve(&stats).unwrap(), - CompactionPlanningConfig::SmallFiles(_) - )); - - // Priority 3: Full when no threshold met but fallback enabled - let stats = create_test_stats(10, 2, 1); - assert!(matches!( - config.resolve(&stats).unwrap(), - CompactionPlanningConfig::Full(_) - )); + assert!(config.files_with_deletes_candidate(&stats).is_none()); } #[test] - fn test_resolve_returns_none() { - let config = AutoCompactionConfigBuilder::default() - .enable_full_fallback(false) - .build() - .unwrap(); + fn test_candidates_return_none_for_small_tables() { + let config = AutoCompactionConfigBuilder::default().build().unwrap(); // Empty table - assert!(config.resolve(&create_test_stats(0, 0, 0)).is_none()); + assert!( + config + .files_with_deletes_candidate(&create_test_stats(0, 0, 0)) + .is_none() + ); + assert!( + config + .small_files_candidate(&create_test_stats(0, 0, 0)) + .is_none() + ); // Single file - assert!(config.resolve(&create_test_stats(1, 0, 0)).is_none()); - - // Multiple files but no threshold met and fallback disabled - assert!(config.resolve(&create_test_stats(5, 2, 0)).is_none()); + assert!( + config + .files_with_deletes_candidate(&create_test_stats(1, 0, 0)) + .is_none() + ); + assert!( + config + .small_files_candidate(&create_test_stats(1, 0, 0)) + .is_none() + ); } #[test] - fn test_resolve_fallback_behavior() { - // Use stats that don't meet default thresholds - let stats = create_test_stats(10, 2, 0); - - // Fallback enabled -> Full - let config = AutoCompactionConfigBuilder::default() - .enable_full_fallback(true) - .build() - .unwrap(); - assert!(matches!( - config.resolve(&stats).unwrap(), - CompactionPlanningConfig::Full(_) - )); - - // Fallback disabled -> None - let config = AutoCompactionConfigBuilder::default() - .enable_full_fallback(false) - .build() - .unwrap(); - assert!(config.resolve(&stats).is_none()); + fn test_auto_default_budget_is_unbounded() { + let config = AutoCompactionConfig::default(); + assert_eq!(config.max_auto_plans_per_run, NonZeroUsize::MAX); } #[test] - fn test_resolve_impact_ratio() { + fn test_candidates_ignore_table_wide_ratio() { let config = AutoCompactionConfigBuilder::default() .thresholds(AutoThresholds { - min_files_with_deletes_count: 5, + min_delete_heavy_files_count: 5, min_small_files_count: 5, - min_impact_ratio: Some(0.10), }) - .enable_full_fallback(true) .build() .unwrap(); - // Low impact (0.5%) -> fallback to Full + // Low table-wide ratio still qualifies once absolute threshold is met. let stats = create_test_stats(10000, 100, 50); assert!(matches!( - config.resolve(&stats).unwrap(), - CompactionPlanningConfig::Full(_) + config.files_with_deletes_candidate(&stats).unwrap(), + CompactionPlanningConfig::FilesWithDeletes(_) )); - - // High impact (80%) -> use specialized strategy - let stats = create_test_stats(1000, 100, 800); assert!(matches!( - config.resolve(&stats).unwrap(), - CompactionPlanningConfig::FilesWithDeletes(_) + config.small_files_candidate(&stats).unwrap(), + CompactionPlanningConfig::SmallFiles(_) )); - // At boundary (10%) -> use specialized strategy - let stats = create_test_stats(100, 5, 10); + // Larger counts continue to qualify. + let stats = create_test_stats(1000, 0, 400); assert!(matches!( - config.resolve(&stats).unwrap(), + config.files_with_deletes_candidate(&stats).unwrap(), CompactionPlanningConfig::FilesWithDeletes(_) )); - // Below boundary (9%) -> fallback to Full - let stats = create_test_stats(100, 5, 9); + // Absolute threshold still controls eligibility. + let stats = create_test_stats(100, 0, 10); assert!(matches!( - config.resolve(&stats).unwrap(), - CompactionPlanningConfig::Full(_) + config.files_with_deletes_candidate(&stats).unwrap(), + CompactionPlanningConfig::FilesWithDeletes(_) )); + + // Below threshold -> no candidate + let stats = create_test_stats(100, 0, 4); + assert!(config.files_with_deletes_candidate(&stats).is_none()); } #[test] - fn test_resolve_threshold_boundaries() { + fn test_small_files_candidate_threshold_boundaries() { let config = AutoCompactionConfigBuilder::default() .thresholds(AutoThresholds { - min_files_with_deletes_count: 3, + min_delete_heavy_files_count: 3, min_small_files_count: 5, - min_impact_ratio: None, }) .build() .unwrap(); @@ -723,45 +701,115 @@ mod tests { // At delete threshold (exactly 3) let stats = create_test_stats(10, 0, 3); assert!(matches!( - config.resolve(&stats).unwrap(), + config.files_with_deletes_candidate(&stats).unwrap(), CompactionPlanningConfig::FilesWithDeletes(_) )); // At small files threshold (exactly 5) let stats = create_test_stats(10, 5, 2); assert!(matches!( - config.resolve(&stats).unwrap(), + config.small_files_candidate(&stats).unwrap(), CompactionPlanningConfig::SmallFiles(_) )); - // Below both thresholds + // Below small-files threshold let stats = create_test_stats(10, 2, 1); - assert!(matches!( - config.resolve(&stats).unwrap(), - CompactionPlanningConfig::Full(_) - )); + assert!(config.small_files_candidate(&stats).is_none()); } #[test] - fn test_resolve_propagates_config() { + fn test_delete_candidate_propagates_config() { let config = AutoCompactionConfigBuilder::default() .target_file_size_bytes(1_000_000_u64) .max_input_parallelism(8_usize) + .max_output_parallelism(6_usize) .thresholds(AutoThresholds { - min_files_with_deletes_count: 2, + min_delete_heavy_files_count: 2, min_small_files_count: 10, - min_impact_ratio: None, }) .build() .unwrap(); let stats = create_test_stats(10, 1, 3); - let CompactionPlanningConfig::FilesWithDeletes(cfg) = config.resolve(&stats).unwrap() + let CompactionPlanningConfig::FilesWithDeletes(cfg) = + config.files_with_deletes_candidate(&stats).unwrap() else { panic!("Expected FilesWithDeletes"); }; assert_eq!(cfg.target_file_size_bytes, 1_000_000); assert_eq!(cfg.max_input_parallelism, 8); + assert_eq!(cfg.max_output_parallelism, 6); + } + + #[test] + fn test_files_with_deletes_candidate_preserves_group_filters() { + let config = AutoCompactionConfigBuilder::default() + .group_filters(GroupFilters { + min_group_size_bytes: Some(123_u64), + min_group_file_count: Some(7_usize), + }) + .thresholds(AutoThresholds { + min_delete_heavy_files_count: 1, + min_small_files_count: usize::MAX, + }) + .build() + .unwrap(); + + let stats = create_test_stats(10, 0, 1); + let CompactionPlanningConfig::FilesWithDeletes(cfg) = + config.files_with_deletes_candidate(&stats).unwrap() + else { + panic!("Expected FilesWithDeletes"); + }; + + let gf = cfg + .group_filters + .expect("Auto should propagate group filters"); + assert_eq!(gf.min_group_size_bytes, Some(123_u64)); + assert_eq!(gf.min_group_file_count, Some(7_usize)); + } + + #[test] + fn test_files_with_deletes_candidate_is_disabled_when_delete_threshold_is_zero() { + let config = AutoCompactionConfigBuilder::default() + .min_delete_file_count_threshold(0_usize) + .thresholds(AutoThresholds { + min_delete_heavy_files_count: 1, + min_small_files_count: usize::MAX, + }) + .build() + .unwrap(); + + let stats = create_test_stats(10, 0, 10); + assert!(config.files_with_deletes_candidate(&stats).is_none()); + } + + #[test] + fn test_files_with_deletes_candidate_is_disabled_when_auto_threshold_is_zero() { + let config = AutoCompactionConfigBuilder::default() + .thresholds(AutoThresholds { + min_delete_heavy_files_count: 0, + min_small_files_count: usize::MAX, + }) + .build() + .unwrap(); + + let stats = create_test_stats(10, 0, 10); + assert!(config.files_with_deletes_candidate(&stats).is_none()); + } + + #[test] + fn test_small_files_candidate_is_disabled_when_auto_threshold_is_zero() { + let config = AutoCompactionConfigBuilder::default() + .thresholds(AutoThresholds { + min_delete_heavy_files_count: usize::MAX, + min_small_files_count: 0, + }) + .build() + .unwrap(); + + let stats = create_test_stats(10, 10, 0); + assert!(config.small_files_candidate(&stats).is_none()); } } diff --git a/core/src/file_selection/mod.rs b/core/src/file_selection/mod.rs index 90ac1c5..707a94b 100644 --- a/core/src/file_selection/mod.rs +++ b/core/src/file_selection/mod.rs @@ -27,7 +27,7 @@ pub mod strategy; pub struct SnapshotStats { pub total_data_files: usize, pub small_files_count: usize, - pub files_with_deletes_count: usize, + pub delete_heavy_files_count: usize, } pub use packer::ListPacker; pub use strategy::{FileGroup, PlanStrategy}; diff --git a/core/src/file_selection/strategy.rs b/core/src/file_selection/strategy.rs index 6b6f71f..8a58f30 100644 --- a/core/src/file_selection/strategy.rs +++ b/core/src/file_selection/strategy.rs @@ -487,7 +487,10 @@ impl std::fmt::Display for BinPackGroupingStrategy { /// File filter by size threshold. /// -/// Filters by `task.length`. Bounds are inclusive. If both `None`, passes all files. +/// Filters by `task.length`. +/// +/// `min_size` is inclusive and `max_size` is exclusive. If both are `None`, +/// passes all files. #[derive(Debug)] pub struct SizeFilterStrategy { pub min_size: Option, @@ -501,9 +504,9 @@ impl FileFilterStrategy for SizeFilterStrategy { .filter(|task| { let file_size = task.length; match (self.min_size, self.max_size) { - (Some(min), Some(max)) => file_size >= min && file_size <= max, + (Some(min), Some(max)) => file_size >= min && file_size < max, (Some(min), None) => file_size >= min, - (None, Some(max)) => file_size <= max, + (None, Some(max)) => file_size < max, (None, None) => true, } }) @@ -1109,7 +1112,7 @@ mod tests { assert_eq!(strategy.to_string(), expected_desc); } - // Test normal range filtering (5-50MB) + // Test normal range filtering [5MB, 50MB) let strategy = SizeFilterStrategy { min_size: Some(5 * 1024 * 1024), max_size: Some(50 * 1024 * 1024), @@ -1137,22 +1140,17 @@ mod tests { ]; let result: Vec = strategy.filter(test_files); - assert_eq!(result.len(), 4); + assert_eq!(result.len(), 3); TestUtils::assert_paths_eq( - &[ - "min_edge.parquet", - "medium1.parquet", - "medium2.parquet", - "max_edge.parquet", - ], + &["min_edge.parquet", "medium1.parquet", "medium2.parquet"], &result, ); for file in &result { - assert!(file.length >= 5 * 1024 * 1024 && file.length <= 50 * 1024 * 1024); + assert!(file.length >= 5 * 1024 * 1024 && file.length < 50 * 1024 * 1024); } - // Test min = max (exact match only) + // Test min = max (empty range because max is exclusive) let exact_strategy = SizeFilterStrategy { min_size: Some(10 * 1024 * 1024), max_size: Some(10 * 1024 * 1024), @@ -1169,8 +1167,7 @@ mod tests { .build(), ]; let result = exact_strategy.filter(test_files); - assert_eq!(result.len(), 1); - assert_eq!(result[0].data_file_path, "exact.parquet"); + assert_eq!(result.len(), 0); // Test min > max (invalid range - should return empty) let invalid_strategy = SizeFilterStrategy { diff --git a/docs/compaction-strategy-contract.md b/docs/compaction-strategy-contract.md new file mode 100644 index 0000000..e81cba2 --- /dev/null +++ b/docs/compaction-strategy-contract.md @@ -0,0 +1,149 @@ +# Compaction Design + +This document describes the current design boundaries of `Full`, `SmallFiles`, `FilesWithDeletes`, and `Auto`, as well as the responsibility split between `Auto` and external callers. + +## Goals + +1. Users should only need to call `Auto`, without manually selecting a compaction type. +2. `Auto` should make decisions based only on the current snapshot, without assuming the caller knows historical execution state. +3. `Auto` should prefer localized rewrites with explicit candidate sets. +4. `Auto` should not rewrite healthy files across the whole table by default. +5. The amount of work performed by a single `Auto` run must be bounded. + +## Terms + +- `data file`: a `FileScanTask` where `data_file_content == Data` +- `delete-heavy`: when `min_delete_file_count_threshold > 0`, `deletes.len() >= min_delete_file_count_threshold` +- `candidate set`: the set of data files that a strategy is allowed to include in compaction +- `group gating`: group-level thresholds used to avoid frequent small rewrites +- `plan budget`: the maximum number of plans that `Auto` is allowed to execute in a single run +- `fixed-point rewrite`: for the input files rewritten in the current run, the newly committed snapshot should cause them to leave that strategy's candidate set + +## Strategy Model + +### `Full` + +- Intended use: explicit/manual full-table rewrite +- Candidate set: all data files +- Does not need to be fixed-point +- Is not used as an `Auto` fallback + +### `SmallFiles` + +- Intended use: append-only or general size-based compaction +- Candidate set: `file_size < small_file_threshold_bytes` +- May use `group_filters` for group gating +- Must be fixed-point: rewritten input files that reach the target threshold should leave the candidate set in the newly committed snapshot + +### `FilesWithDeletes` + +- Intended use: timely cleanup of delete-heavy files +- Candidate set: `deletes.len() >= min_delete_file_count_threshold` +- May use `group_filters` for group gating +- `Auto` does not rewrite or override caller-provided group gating for this strategy +- Under `Auto`, `min_delete_file_count_threshold == 0` disables delete-heavy detection and therefore disables this candidate +- Must be fixed-point: rewritten delete-heavy input files should leave the candidate set in the newly committed snapshot + +## `Auto` Planner + +`Auto` only chooses between two localized rewrite strategies: `FilesWithDeletes` and `SmallFiles`. + +Zero-valued auto thresholds are treated as disabled for the corresponding candidate. + +Within a single scan, `Auto` produces two candidate plan sets: + +1. `FilesWithDeletes` plan +2. `SmallFiles` plan + +It then applies the following fixed decision order: + +1. If the delete plan is non-empty, select `FilesWithDeletes` +2. Otherwise, if the small-files plan is non-empty, select `SmallFiles` +3. Apply `max_auto_plans_per_run` uniformly to the selected plan set +4. If the selected plan set becomes empty after capping, return an empty result + +Design focus: + +- Only choose localized rewrite strategies +- Apply a uniform budget to the final selected plan set + +## Why `Auto` Does Not Fall Back to `Full` + +Both `SmallFiles` and `FilesWithDeletes` have explicit candidate sets. After successful execution, those rewritten files usually leave the candidate set, so repeated high-frequency invocations tend to converge naturally. + +`Full` does not have this property. Its candidate set is the entire table. If it were used as a normal `Auto` fallback, frequent invocations could repeatedly rewrite healthy parquet files that are already close to `target_file_size`. + +For that reason, `Auto` does not introduce a full-like special case and does not use `Full` as a fallback path. + +## Planner Budget + +`max_auto_plans_per_run` is planner-level configuration, not external invocation policy. Its default is unlimited. +It is represented as a positive integer budget; zero is not a valid configuration value. + +Rationale: + +- The planner directly returns executable plans, so budget enforcement should happen inside the planner +- `planned_input_bytes`, `planned_input_files`, `rewrite_ratio`, and `reason` stay consistent with the final returned plan set +- It avoids forcing upper layers to recalculate report fields after trimming plans again + +The current budget unit is `plan count`, not input bytes. This assumes grouping already keeps the size of each individual plan within a reasonable range. + +## Return Semantics + +### Non-Empty Results + +- `Recommended`: safe to execute by default +- `BudgetCapped`: a candidate strategy was selected, and the returned plans are the subset that fit within the configured budget + +### Empty Results + +- `NoCandidate`: no strategy threshold was met +- `NoPlansProduced`: a strategy threshold was met, but all plans were filtered out by group gating +- `NoSnapshot`: the target branch does not have an associated snapshot + +### Report Path + +The report result retains the following fields: + +- `selected_strategy` +- `plans` +- `planned_input_bytes` +- `planned_input_files` +- `rewrite_ratio` +- `reason` + +The `reason` in the report must match the final returned plan set. In other words, if the planner caps the plan set due to budget, `reason` must describe the capped result rather than the pre-cap candidate state. + +## High-Frequency Invocation Boundaries + +The current design only guarantees two things: + +1. Selective paths will try to converge naturally +2. The work done by a single `Auto` run is bounded by the planner budget + +The current design does not guarantee: + +- cooldown +- deduplication of repeated invocations against the same snapshot +- "already executed within the last N minutes" +- throttling based on historical execution state + +The reason is straightforward: those signals are outside the snapshot-local view of the current compaction planner. + +## Responsibility Split Between the Library and External Systems + +### Responsibilities of the Library + +- Scan the current snapshot and produce candidate plans +- Choose between `FilesWithDeletes` and `SmallFiles` +- Apply `max_auto_plans_per_run` +- Return a report that is consistent with the final returned plan set + +### Responsibilities of External Systems + +- Decide when to call `Auto` +- Decide whether snapshot age or snapshot count should gate triggering +- Decide whether repeated calls against the same snapshot should be skipped +- Implement cooldown or other cross-invocation throttling policies + +Here, "external systems" means callers or scheduling infrastructure outside the compaction library. The current library provides planning and execution, but does not provide a built-in scheduler.