From 718634860aa294855dad2c708377fa83ecb6dcef Mon Sep 17 00:00:00 2001 From: nabenabe0928 Date: Mon, 12 Apr 2021 20:03:19 +0900 Subject: [PATCH 1/9] [refactor] Separate processing inside BaseTask._search() function Since the readability of the function was quite bad, I separated the processing. I mostly copied and pasted the original processing and checked if we can run some examples. --- autoPyTorch/api/base_task.py | 329 ++++++++++++++++++++--------------- 1 file changed, 184 insertions(+), 145 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index 8f4dbe9e6..d571e8a66 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -486,7 +486,7 @@ def _load_best_individual_model(self) -> SingleBest: return ensemble - def _do_dummy_prediction(self, num_run: int) -> None: + def _do_dummy_prediction(self) -> None: assert self._metric is not None assert self._logger is not None @@ -510,14 +510,14 @@ def _do_dummy_prediction(self, num_run: int) -> None: logger_port=self._logger_port, cost_for_crash=get_cost_of_crash(self._metric), abort_on_first_run_crash=False, - initial_num_run=num_run, + initial_num_run=self.num_run, stats=stats, memory_limit=memory_limit, disable_file_output=True if len(self._disable_file_output) > 0 else False, all_supported_metrics=self._all_supported_metrics ) - status, cost, runtime, additional_info = ta.run(num_run, cutoff=self._time_for_task) + status, cost, runtime, additional_info = ta.run(self.num_run, cutoff=self._time_for_task) if status == StatusType.SUCCESS: self._logger.info("Finished creating dummy predictions.") else: @@ -551,8 +551,8 @@ def _do_dummy_prediction(self, num_run: int) -> None: % (str(status), str(additional_info)) ) - def _do_traditional_prediction(self, num_run: int, time_left: int, func_eval_time_limit_secs: int - ) -> int: + def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: int + ) -> None: """ Fits traditional machine learning algorithms to the provided dataset, while complying with time resource allocation. @@ -582,14 +582,16 @@ def _do_traditional_prediction(self, num_run: int, time_left: int, func_eval_tim assert self._logger is not None assert self._dask_client is not None + self.num_run += 1 + memory_limit = self._memory_limit if memory_limit is not None: memory_limit = int(math.ceil(memory_limit)) available_classifiers = get_available_classifiers() dask_futures = [] - total_number_classifiers = len(available_classifiers) + num_run - for n_r, classifier in enumerate(available_classifiers, start=num_run): + total_number_classifiers = len(available_classifiers) + self.num_run + for n_r, classifier in enumerate(available_classifiers, start=self.num_run): # Only launch a task if there is time start_time = time.time() @@ -623,7 +625,7 @@ def _do_traditional_prediction(self, num_run: int, time_left: int, func_eval_tim ]) # Increment the launched job index - num_run = n_r + self.num_run = n_r # When managing time, we need to take into account the allocated time resources, # which are dependent on the number of cores. 'dask_futures' is a proxy to the number @@ -677,7 +679,157 @@ def _do_traditional_prediction(self, num_run: int, time_left: int, func_eval_tim "Please consider increasing the run time to further improve performance.") break - return num_run + def _run_dummy_predictions(self) -> None: + dummy_task_name = 'runDummy' + self._stopwatch.start_task(dummy_task_name) + self._do_dummy_prediction() + self._stopwatch.stop_task(dummy_task_name) + + def _run_traditional_ml(self, + enable_traditional_pipeline: bool, + func_eval_time_limit_secs: Optional[int] = None) -> int: + """We would like to obtain training time for at least 1 Neural network in SMAC""" + + if enable_traditional_pipeline: + if STRING_TO_TASK_TYPES[self.task_type] in REGRESSION_TASKS: + self._logger.warning("Traditional Pipeline is not enabled for regression. Skipping...") + else: + traditional_task_name = 'runTraditional' + self._stopwatch.start_task(traditional_task_name) + elapsed_time = self._stopwatch.wall_elapsed(self.dataset_name) + + time_for_traditional = int( + self._time_for_task - elapsed_time - func_eval_time_limit_secs + ) + self.num_run = self._do_traditional_prediction( + func_eval_time_limit_secs=func_eval_time_limit_secs, + time_left=time_for_traditional, + ) + self._stopwatch.stop_task(traditional_task_name) + + def _run_ensemble(self, + dataset: BaseDataset, + optimize_metric: str, + total_walltime_limit: int, + precision: int) -> EnsembleBuilderManager: + + elapsed_time = self._stopwatch.wall_elapsed(self.dataset_name) + time_left_for_ensembles = max(0, total_walltime_limit - elapsed_time) + proc_ensemble = None + if time_left_for_ensembles <= 0 and self.ensemble_size > 0: + raise ValueError("Could not run ensemble builder because there " + "is no time left. Try increasing the value " + "of total_walltime_limit.") + elif self.ensemble_size <= 0: + self._logger.info("Could not run ensemble builder as ensemble size is non-positive.") + else: + self._logger.info("Run ensemble") + ensemble_task_name = 'ensemble' + self._stopwatch.start_task(ensemble_task_name) + proc_ensemble = EnsembleBuilderManager( + start_time=time.time(), + time_left_for_ensembles=time_left_for_ensembles, + backend=copy.deepcopy(self._backend), + dataset_name=dataset.dataset_name, + output_type=STRING_TO_OUTPUT_TYPES[dataset.output_type], + task_type=STRING_TO_TASK_TYPES[self.task_type], + metrics=[self._metric], + opt_metric=optimize_metric, + ensemble_size=self.ensemble_size, + ensemble_nbest=self.ensemble_nbest, + max_models_on_disc=self.max_models_on_disc, + seed=self.seed, + max_iterations=None, + read_at_most=sys.maxsize, + ensemble_memory_limit=self._memory_limit, + random_state=self.seed, + precision=precision, + logger_port=self._logger_port, + ) + self._stopwatch.stop_task(ensemble_task_name) + + return proc_ensemble + + def _get_budget_config(self, + budget_type: Optional[str] = None, + budget: Optional[float] = None) -> Dict[str, Union[float, str]]: + + budget_config: Dict[str, Union[float, str]] = {} + if budget_type is not None and budget is not None: + budget_config['budget_type'] = budget_type + budget_config[budget_type] = budget + elif budget_type is not None or budget is not None: + raise ValueError("budget type was not specified in budget_config") + + return budget_config + + def _start_smac(self, proc_smac: AutoMLSMBO): + try: + self.run_history, self.trajectory, budget_type = \ + proc_smac.run_smbo() + trajectory_filename = os.path.join( + self._backend.get_smac_output_directory_for_run(self.seed), + 'trajectory.json') + saveable_trajectory = \ + [list(entry[:2]) + [entry[2].get_dictionary()] + list(entry[3:]) + for entry in self.trajectory] + except Exception as e: + self._logger.exception(str(e)) + raise + else: + try: + with open(trajectory_filename, 'w') as fh: + json.dump(saveable_trajectory, fh) + except Exception as e: + self._logger.warning(f"Could not save {trajectory_filename} due to {e}...") + + def _run_smac(self, + experiment_task_name: str, + dataset: BaseDataset, + proc_ensemble: EnsembleBuilderManager, + total_walltime_limit: int, + budget_type: Optional[str] = None, + budget: Optional[float] = None, + func_eval_time_limit_secs: Optional[int] = None, + get_smac_object_callback: Optional[Callable] = None, + smac_scenario_args: Optional[Dict[str, Any]] = None) -> None: + + smac_task_name = 'runSMAC' + self._stopwatch.start_task(smac_task_name) + elapsed_time = self._stopwatch.wall_elapsed(experiment_task_name) + time_left_for_smac = max(0, total_walltime_limit - elapsed_time) + + self._logger.info(f"Run SMAC with {time_left_for_smac:.2f} sec time left") + if time_left_for_smac <= 0: + self._logger.warning(" Could not run SMAC because there is no time left") + else: + budget_config = self._get_budget_config(budget_type=budget_type, budget=budget) + proc_smac = AutoMLSMBO( + config_space=self.search_space, + dataset_name=dataset.dataset_name, + backend=self._backend, + total_walltime_limit=total_walltime_limit, + func_eval_time_limit_secs=func_eval_time_limit_secs, + dask_client=self._dask_client, + memory_limit=self._memory_limit, + n_jobs=self.n_jobs, + watcher=self._stopwatch, + metric=self._metric, + seed=self.seed, + include=self.include_components, + exclude=self.exclude_components, + disable_file_output=self._disable_file_output, + all_supported_metrics=self._all_supported_metrics, + smac_scenario_args=smac_scenario_args, + get_smac_object_callback=get_smac_object_callback, + pipeline_config={**self.pipeline_options, **budget_config}, + ensemble_callback=proc_ensemble, + logger_port=self._logger_port, + start_num_run=self.num_run, + search_space_updates=self.search_space_updates + ) + + self._start_smac(proc_smac) def _search( self, @@ -775,6 +927,8 @@ def _search( raise ValueError("Incompatible dataset entered for current task," "expected dataset to have task type :{} got " ":{}".format(self.task_type, dataset.task_type)) + if precision not in [16, 32, 64]: + raise ValueError(f"precision must be either [16, 32, 64], but got {precision}") # Initialise information needed for the experiment experiment_task_name = 'runSearch' @@ -803,15 +957,6 @@ def _search( self.search_space = self.get_search_space(dataset) - budget_config: Dict[str, Union[float, str]] = {} - if budget_type is not None and budget is not None: - budget_config['budget_type'] = budget_type - budget_config[budget_type] = budget - elif budget_type is not None or budget is not None: - raise ValueError( - "budget type was not specified in budget_config" - ) - if self.task_type is None: raise ValueError("Cannot interpret task type from the dataset") @@ -846,126 +991,24 @@ def _search( ) ) - # ============> Run dummy predictions - num_run = 1 - dummy_task_name = 'runDummy' - self._stopwatch.start_task(dummy_task_name) - self._do_dummy_prediction(num_run) - self._stopwatch.stop_task(dummy_task_name) - - # ============> Run traditional ml - - if enable_traditional_pipeline: - if STRING_TO_TASK_TYPES[self.task_type] in REGRESSION_TASKS: - self._logger.warning("Traditional Pipeline is not enabled for regression. Skipping...") - else: - traditional_task_name = 'runTraditional' - self._stopwatch.start_task(traditional_task_name) - elapsed_time = self._stopwatch.wall_elapsed(self.dataset_name) - # We want time for at least 1 Neural network in SMAC - time_for_traditional = int( - self._time_for_task - elapsed_time - func_eval_time_limit_secs - ) - num_run = self._do_traditional_prediction( - num_run=num_run + 1, func_eval_time_limit_secs=func_eval_time_limit_secs, - time_left=time_for_traditional, - ) - self._stopwatch.stop_task(traditional_task_name) - - # ============> Starting ensemble - elapsed_time = self._stopwatch.wall_elapsed(self.dataset_name) - time_left_for_ensembles = max(0, total_walltime_limit - elapsed_time) - proc_ensemble = None - if time_left_for_ensembles <= 0: - # Fit only raises error when ensemble_size is not zero but - # time_left_for_ensembles is zero. - if self.ensemble_size > 0: - raise ValueError("Not starting ensemble builder because there " - "is no time left. Try increasing the value " - "of time_left_for_this_task.") - elif self.ensemble_size <= 0: - self._logger.info("Not starting ensemble builder as ensemble size is 0") - else: - self._logger.info("Starting ensemble") - ensemble_task_name = 'ensemble' - self._stopwatch.start_task(ensemble_task_name) - proc_ensemble = EnsembleBuilderManager( - start_time=time.time(), - time_left_for_ensembles=time_left_for_ensembles, - backend=copy.deepcopy(self._backend), - dataset_name=dataset.dataset_name, - output_type=STRING_TO_OUTPUT_TYPES[dataset.output_type], - task_type=STRING_TO_TASK_TYPES[self.task_type], - metrics=[self._metric], - opt_metric=optimize_metric, - ensemble_size=self.ensemble_size, - ensemble_nbest=self.ensemble_nbest, - max_models_on_disc=self.max_models_on_disc, - seed=self.seed, - max_iterations=None, - read_at_most=sys.maxsize, - ensemble_memory_limit=self._memory_limit, - random_state=self.seed, - precision=precision, - logger_port=self._logger_port, - ) - self._stopwatch.stop_task(ensemble_task_name) - - # ==> Run SMAC - smac_task_name = 'runSMAC' - self._stopwatch.start_task(smac_task_name) - elapsed_time = self._stopwatch.wall_elapsed(experiment_task_name) - time_left_for_smac = max(0, total_walltime_limit - elapsed_time) + self.num_run = 1 + self._run_dummy_predictions() + self._run_traditional_ml(enable_traditional_pipeline=enable_traditional_pipeline, + func_eval_time_limit_secs=func_eval_time_limit_secs) + proc_ensemble = self._run_ensemble(dataset=dataset, precision=precision, + optimize_metric=optimize_metric, + total_walltime_limit=total_walltime_limit) + + self._run_smac(experiment_task_name=experiment_task_name, + budget=budget, budget_type=budget_type, proc_ensemble=proc_ensemble, + dataset=dataset, total_walltime_limit=total_walltime_limit, + func_eval_time_limit_secs=func_eval_time_limit_secs, + get_smac_object_callback=get_smac_object_callback, + smac_scenario_args=smac_scenario_args) - self._logger.info("Starting SMAC with %5.2f sec time left" % time_left_for_smac) - if time_left_for_smac <= 0: - self._logger.warning(" Not starting SMAC because there is no time left") - else: - - _proc_smac = AutoMLSMBO( - config_space=self.search_space, - dataset_name=dataset.dataset_name, - backend=self._backend, - total_walltime_limit=total_walltime_limit, - func_eval_time_limit_secs=func_eval_time_limit_secs, - dask_client=self._dask_client, - memory_limit=self._memory_limit, - n_jobs=self.n_jobs, - watcher=self._stopwatch, - metric=self._metric, - seed=self.seed, - include=self.include_components, - exclude=self.exclude_components, - disable_file_output=self._disable_file_output, - all_supported_metrics=self._all_supported_metrics, - smac_scenario_args=smac_scenario_args, - get_smac_object_callback=get_smac_object_callback, - pipeline_config={**self.pipeline_options, **budget_config}, - ensemble_callback=proc_ensemble, - logger_port=self._logger_port, - start_num_run=num_run, - search_space_updates=self.search_space_updates - ) - try: - self.run_history, self.trajectory, budget_type = \ - _proc_smac.run_smbo() - trajectory_filename = os.path.join( - self._backend.get_smac_output_directory_for_run(self.seed), - 'trajectory.json') - saveable_trajectory = \ - [list(entry[:2]) + [entry[2].get_dictionary()] + list(entry[3:]) - for entry in self.trajectory] - try: - with open(trajectory_filename, 'w') as fh: - json.dump(saveable_trajectory, fh) - except Exception as e: - self._logger.warning(f"Cannot save {trajectory_filename} due to {e}...") - except Exception as e: - self._logger.exception(str(e)) - raise # Wait until the ensemble process is finished to avoid shutting down # while the ensemble builder tries to access the data - self._logger.info("Starting Shutdown") + self._logger.info("Start Shutdown") if proc_ensemble is not None: self.ensemble_performance_history = list(proc_ensemble.history) @@ -1060,12 +1103,12 @@ def refit( for identifier in self.models_: model = self.models_[identifier] - # this updates the model inplace, it can then later be used in + # It updates the model inplace, it can then later be used in # predict method - # try to fit the model. If it fails, shuffle the data. This - # could alleviate the problem in algorithms that depend on - # the ordering of the data. + # Fit the model to check if it fails. + # If it fails, shuffle the data to alleviate + # the ordering-of-the-data issue in algorithms fit_and_suppress_warnings(self._logger, model, X, y=None) self._clean_logger() @@ -1231,15 +1274,11 @@ def __del__(self) -> None: self._backend.context.delete_directories(force=False) @typing.no_type_check - def get_incumbent_results( - self - ): + def get_incumbent_results(self): pass @typing.no_type_check - def get_incumbent_config( - self - ): + def get_incumbent_config(self): pass def get_models_with_weights(self) -> List: From ff1d7d3d07c1eeb3a3b16eeb7e26a6ca42135a86 Mon Sep 17 00:00:00 2001 From: nabenabe0928 Date: Tue, 13 Apr 2021 00:28:22 +0900 Subject: [PATCH 2/9] [refactor] Add _search_settings to make _search() function shorter --- autoPyTorch/api/base_task.py | 227 ++++++++++++++++++++--------------- 1 file changed, 127 insertions(+), 100 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index d571e8a66..13743981e 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -47,7 +47,7 @@ from autoPyTorch.pipeline.components.training.metrics.base import autoPyTorchMetric from autoPyTorch.pipeline.components.training.metrics.utils import calculate_score, get_metrics from autoPyTorch.utils.backend import Backend, create -from autoPyTorch.utils.common import FitRequirement, replace_string_bool_to_bool +from autoPyTorch.utils.common import replace_string_bool_to_bool from autoPyTorch.utils.hyperparameter_search_space_update import HyperparameterSearchSpaceUpdates from autoPyTorch.utils.logging_ import ( PicklableClientLogger, @@ -170,13 +170,14 @@ def __init__( os.path.join(os.path.dirname(__file__), '../configs/default_pipeline_options.json')))) self.search_space: Optional[ConfigurationSpace] = None - self._dataset_requirements: Optional[List[FitRequirement]] = None self._metric: Optional[autoPyTorchMetric] = None self._logger: Optional[PicklableClientLogger] = None self.run_history: Optional[RunHistory] = None self.trajectory: Optional[List] = None self.dataset_name: Optional[str] = None self.cv_models_: Dict = {} + self.num_run: int = 1 + self.experiment_task_name: str = 'runSearch' # By default try to use the TCP logging port or get a new port self._logger_port = logging.handlers.DEFAULT_TCP_LOGGING_PORT @@ -687,7 +688,7 @@ def _run_dummy_predictions(self) -> None: def _run_traditional_ml(self, enable_traditional_pipeline: bool, - func_eval_time_limit_secs: Optional[int] = None) -> int: + func_eval_time_limit_secs: Optional[int] = None) -> None: """We would like to obtain training time for at least 1 Neural network in SMAC""" if enable_traditional_pipeline: @@ -784,7 +785,6 @@ def _start_smac(self, proc_smac: AutoMLSMBO): self._logger.warning(f"Could not save {trajectory_filename} due to {e}...") def _run_smac(self, - experiment_task_name: str, dataset: BaseDataset, proc_ensemble: EnsembleBuilderManager, total_walltime_limit: int, @@ -796,7 +796,7 @@ def _run_smac(self, smac_task_name = 'runSMAC' self._stopwatch.start_task(smac_task_name) - elapsed_time = self._stopwatch.wall_elapsed(experiment_task_name) + elapsed_time = self._stopwatch.wall_elapsed(self.experiment_task_name) time_left_for_smac = max(0, total_walltime_limit - elapsed_time) self._logger.info(f"Run SMAC with {time_left_for_smac:.2f} sec time left") @@ -831,6 +831,116 @@ def _run_smac(self, self._start_smac(proc_smac) + def _search_settings(self, dataset: BaseDataset, disable_file_output: List, + optimize_metric: str, memory_limit: Optional[int] = 4096, + total_walltime_limit: int = 100, all_supported_metrics: bool = True + ) -> None: + + """Initialise information needed for the experiment""" + self.experiment_task_name = 'runSearch' + dataset_requirements = get_dataset_requirements( + info=self._get_required_dataset_properties(dataset)) + dataset_properties = dataset.get_dataset_properties(dataset_requirements) + + self._stopwatch.start_task(self.experiment_task_name) + self.dataset_name = dataset.dataset_name + self._all_supported_metrics = all_supported_metrics + self._disable_file_output = disable_file_output + self._memory_limit = memory_limit + self._time_for_task = total_walltime_limit + self._metric = get_metrics( + names=[optimize_metric], dataset_properties=dataset_properties)[0] + + if self._logger is None: + self._logger = self._get_logger(self.dataset_name) + + # Save start time to backend + self._backend.save_start_time(str(self.seed)) + self._backend.save_datamanager(dataset) + + # Print debug information to log + self._print_debug_info_to_log() + + self.search_space = self.get_search_space(dataset) + + # If no dask client was provided, we create one, so that we can + # start a ensemble process in parallel to smbo optimize + if ( + self._dask_client is None and (self.ensemble_size > 0 or self.n_jobs is not None and self.n_jobs > 1) + ): + self._create_dask_client() + else: + self._is_dask_client_internally_created = False + + def _adapt_time_resource_allocation(self, + total_walltime_limit: int, + func_eval_time_limit_secs: Optional[int] = None + ) -> int: + + # Handle time resource allocation + elapsed_time = self._stopwatch.wall_elapsed(self.experiment_task_name) + time_left_for_modelfit = int(max(0, total_walltime_limit - elapsed_time)) + if func_eval_time_limit_secs is None or func_eval_time_limit_secs > time_left_for_modelfit: + self._logger.warning( + 'Time limit for a single run is higher than total time ' + 'limit. Capping the limit for a single run to the total ' + 'time given to SMAC (%f)' % time_left_for_modelfit + ) + func_eval_time_limit_secs = time_left_for_modelfit + + # Make sure that at least 2 models are created for the ensemble process + num_models = time_left_for_modelfit // func_eval_time_limit_secs + if num_models < 2: + func_eval_time_limit_secs = time_left_for_modelfit // 2 + self._logger.warning( + "Capping the func_eval_time_limit_secs to {} to have " + "time for a least 2 models to ensemble.".format( + func_eval_time_limit_secs + ) + ) + + return func_eval_time_limit_secs + + def _save_ensemble_performance_history(self, proc_ensemble: EnsembleBuilderManager) -> None: + if len(proc_ensemble.futures) > 0: + # Also add ensemble runs that did not finish within smac time + # and add them into the ensemble history + self._logger.info("Ensemble script still running, waiting for it to finish.") + result = proc_ensemble.futures.pop().result() + if result: + ensemble_history, _, _, _ = result + self.ensemble_performance_history.extend(ensemble_history) + self._logger.info("Ensemble script finished, continue shutdown.") + + # save the ensemble performance history file + if len(self.ensemble_performance_history) > 0: + pd.DataFrame(self.ensemble_performance_history).to_json( + os.path.join(self._backend.internals_directory, 'ensemble_history.json')) + + def _finish_experiment(self, proc_ensemble: EnsembleBuilderManager, + load_models: bool) -> None: + + # Wait until the ensemble process is finished to avoid shutting down + # while the ensemble builder tries to access the data + self._logger.info("Start Shutdown") + + if proc_ensemble is not None: + self.ensemble_performance_history = list(proc_ensemble.history) + self._save_ensemble_performance_history(proc_ensemble) + + self._logger.info("Closing the dask infrastructure") + self._close_dask_client() + self._logger.info("Finished closing the dask infrastructure") + + if load_models: + self._logger.info("Loading models...") + self._load_models() + self._logger.info("Finished loading models...") + + # Clean up the logger + self._logger.info("Starting to clean up the logger") + self._clean_logger() + def _search( self, optimize_metric: str, @@ -927,69 +1037,20 @@ def _search( raise ValueError("Incompatible dataset entered for current task," "expected dataset to have task type :{} got " ":{}".format(self.task_type, dataset.task_type)) - if precision not in [16, 32, 64]: - raise ValueError(f"precision must be either [16, 32, 64], but got {precision}") - - # Initialise information needed for the experiment - experiment_task_name = 'runSearch' - dataset_requirements = get_dataset_requirements( - info=self._get_required_dataset_properties(dataset)) - self._dataset_requirements = dataset_requirements - dataset_properties = dataset.get_dataset_properties(dataset_requirements) - self._stopwatch.start_task(experiment_task_name) - self.dataset_name = dataset.dataset_name - if self._logger is None: - self._logger = self._get_logger(self.dataset_name) - self._all_supported_metrics = all_supported_metrics - self._disable_file_output = disable_file_output - self._memory_limit = memory_limit - self._time_for_task = total_walltime_limit - # Save start time to backend - self._backend.save_start_time(str(self.seed)) - - self._backend.save_datamanager(dataset) - - # Print debug information to log - self._print_debug_info_to_log() - - self._metric = get_metrics( - names=[optimize_metric], dataset_properties=dataset_properties)[0] - - self.search_space = self.get_search_space(dataset) - if self.task_type is None: raise ValueError("Cannot interpret task type from the dataset") + if precision not in [16, 32, 64]: + raise ValueError(f"precision must be either [16, 32, 64], but got {precision}") - # If no dask client was provided, we create one, so that we can - # start a ensemble process in parallel to smbo optimize - if ( - self._dask_client is None and (self.ensemble_size > 0 or self.n_jobs is not None and self.n_jobs > 1) - ): - self._create_dask_client() - else: - self._is_dask_client_internally_created = False - - # Handle time resource allocation - elapsed_time = self._stopwatch.wall_elapsed(experiment_task_name) - time_left_for_modelfit = int(max(0, total_walltime_limit - elapsed_time)) - if func_eval_time_limit_secs is None or func_eval_time_limit_secs > time_left_for_modelfit: - self._logger.warning( - 'Time limit for a single run is higher than total time ' - 'limit. Capping the limit for a single run to the total ' - 'time given to SMAC (%f)' % time_left_for_modelfit - ) - func_eval_time_limit_secs = time_left_for_modelfit + self._search_settings(dataset=dataset, disable_file_output=disable_file_output, + optimize_metric=optimize_metric, memory_limit=memory_limit, + all_supported_metrics=all_supported_metrics, + total_walltime_limit=total_walltime_limit) - # Make sure that at least 2 models are created for the ensemble process - num_models = time_left_for_modelfit // func_eval_time_limit_secs - if num_models < 2: - func_eval_time_limit_secs = time_left_for_modelfit // 2 - self._logger.warning( - "Capping the func_eval_time_limit_secs to {} to have " - "time for a least 2 models to ensemble.".format( - func_eval_time_limit_secs - ) - ) + func_eval_time_limit_secs = self._adapt_time_resource_allocation( + total_walltime_limit=total_walltime_limit, + func_eval_time_limit_secs=func_eval_time_limit_secs + ) self.num_run = 1 self._run_dummy_predictions() @@ -999,47 +1060,13 @@ def _search( optimize_metric=optimize_metric, total_walltime_limit=total_walltime_limit) - self._run_smac(experiment_task_name=experiment_task_name, - budget=budget, budget_type=budget_type, proc_ensemble=proc_ensemble, + self._run_smac(budget=budget, budget_type=budget_type, proc_ensemble=proc_ensemble, dataset=dataset, total_walltime_limit=total_walltime_limit, func_eval_time_limit_secs=func_eval_time_limit_secs, get_smac_object_callback=get_smac_object_callback, smac_scenario_args=smac_scenario_args) - # Wait until the ensemble process is finished to avoid shutting down - # while the ensemble builder tries to access the data - self._logger.info("Start Shutdown") - - if proc_ensemble is not None: - self.ensemble_performance_history = list(proc_ensemble.history) - - if len(proc_ensemble.futures) > 0: - # Also add ensemble runs that did not finish within smac time - # and add them into the ensemble history - self._logger.info("Ensemble script still running, waiting for it to finish.") - result = proc_ensemble.futures.pop().result() - if result: - ensemble_history, _, _, _ = result - self.ensemble_performance_history.extend(ensemble_history) - self._logger.info("Ensemble script finished, continue shutdown.") - - # save the ensemble performance history file - if len(self.ensemble_performance_history) > 0: - pd.DataFrame(self.ensemble_performance_history).to_json( - os.path.join(self._backend.internals_directory, 'ensemble_history.json')) - - self._logger.info("Closing the dask infrastructure") - self._close_dask_client() - self._logger.info("Finished closing the dask infrastructure") - - if load_models: - self._logger.info("Loading models...") - self._load_models() - self._logger.info("Finished loading models...") - - # Clean up the logger - self._logger.info("Starting to clean up the logger") - self._clean_logger() + self._finish_experiment(proc_ensemble=proc_ensemble, load_models=load_models) return self From ec1a5083e55ca6cd670384abca2581d1daf720bb Mon Sep 17 00:00:00 2001 From: nabenabe0928 Date: Tue, 13 Apr 2021 00:57:21 +0900 Subject: [PATCH 3/9] [fix] Resolve all the mypy issues --- autoPyTorch/api/base_task.py | 184 ++++++++++++++++------------------- 1 file changed, 82 insertions(+), 102 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index 13743981e..5cc4f233c 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -499,7 +499,7 @@ def _do_dummy_prediction(self) -> None: memory_limit = int(math.ceil(memory_limit)) scenario_mock = unittest.mock.Mock() - scenario_mock.wallclock_limit = self._time_for_task + scenario_mock.wallclock_limit = self._total_walltime_limit # This stats object is a hack - maybe the SMAC stats object should # already be generated here! stats = Stats(scenario_mock) @@ -518,7 +518,7 @@ def _do_dummy_prediction(self) -> None: all_supported_metrics=self._all_supported_metrics ) - status, cost, runtime, additional_info = ta.run(self.num_run, cutoff=self._time_for_task) + status, cost, runtime, additional_info = ta.run(self.num_run, cutoff=self._total_walltime_limit) if status == StatusType.SUCCESS: self._logger.info("Finished creating dummy predictions.") else: @@ -552,8 +552,7 @@ def _do_dummy_prediction(self) -> None: % (str(status), str(additional_info)) ) - def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: int - ) -> None: + def _do_traditional_prediction(self, time_left: int) -> None: """ Fits traditional machine learning algorithms to the provided dataset, while complying with time resource allocation. @@ -596,8 +595,8 @@ def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: # Only launch a task if there is time start_time = time.time() - if time_left >= func_eval_time_limit_secs: - self._logger.info(f"{n_r}: Started fitting {classifier} with cutoff={func_eval_time_limit_secs}") + if time_left >= self._func_eval_time_limit_secs: + self._logger.info(f"{n_r}: Started fitting {classifier} with cutoff={self._func_eval_time_limit_secs}") scenario_mock = unittest.mock.Mock() scenario_mock.wallclock_limit = time_left # This stats object is a hack - maybe the SMAC stats object should @@ -621,7 +620,7 @@ def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: classifier, self._dask_client.submit( ta.run, config=classifier, - cutoff=func_eval_time_limit_secs, + cutoff=self._func_eval_time_limit_secs, ) ]) @@ -640,7 +639,7 @@ def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: # How many workers to wait before starting fitting the next iteration workers_to_wait = 1 - if n_r >= total_number_classifiers - 1 or time_left <= func_eval_time_limit_secs: + if n_r >= total_number_classifiers - 1 or time_left <= self._func_eval_time_limit_secs: # If on the last iteration, flush out all tasks workers_to_wait = len(dask_futures) @@ -675,7 +674,7 @@ def _do_traditional_prediction(self, time_left: int, func_eval_time_limit_secs: time_left -= int(time.time() - start_time) # Exit if no more time is available for a new classifier - if time_left < func_eval_time_limit_secs: + if time_left < self._func_eval_time_limit_secs: self._logger.warning("Not enough time to fit all traditional machine learning models." "Please consider increasing the run time to further improve performance.") break @@ -686,36 +685,30 @@ def _run_dummy_predictions(self) -> None: self._do_dummy_prediction() self._stopwatch.stop_task(dummy_task_name) - def _run_traditional_ml(self, - enable_traditional_pipeline: bool, - func_eval_time_limit_secs: Optional[int] = None) -> None: + def _run_traditional_ml(self) -> None: """We would like to obtain training time for at least 1 Neural network in SMAC""" + assert self._logger is not None - if enable_traditional_pipeline: - if STRING_TO_TASK_TYPES[self.task_type] in REGRESSION_TASKS: - self._logger.warning("Traditional Pipeline is not enabled for regression. Skipping...") - else: - traditional_task_name = 'runTraditional' - self._stopwatch.start_task(traditional_task_name) - elapsed_time = self._stopwatch.wall_elapsed(self.dataset_name) + if STRING_TO_TASK_TYPES[self.task_type] in REGRESSION_TASKS: + self._logger.warning("Traditional Pipeline is not enabled for regression. Skipping...") + else: + traditional_task_name = 'runTraditional' + self._stopwatch.start_task(traditional_task_name) + elapsed_time = self._stopwatch.wall_elapsed(self.dataset_name) - time_for_traditional = int( - self._time_for_task - elapsed_time - func_eval_time_limit_secs - ) - self.num_run = self._do_traditional_prediction( - func_eval_time_limit_secs=func_eval_time_limit_secs, - time_left=time_for_traditional, - ) - self._stopwatch.stop_task(traditional_task_name) + time_for_traditional = int( + self._total_walltime_limit - elapsed_time - self._func_eval_time_limit_secs + ) + self._do_traditional_prediction(time_left=time_for_traditional) + self._stopwatch.stop_task(traditional_task_name) - def _run_ensemble(self, - dataset: BaseDataset, - optimize_metric: str, - total_walltime_limit: int, + def _run_ensemble(self, dataset: BaseDataset, optimize_metric: str, precision: int) -> EnsembleBuilderManager: + assert self._logger is not None + elapsed_time = self._stopwatch.wall_elapsed(self.dataset_name) - time_left_for_ensembles = max(0, total_walltime_limit - elapsed_time) + time_left_for_ensembles = max(0, self._total_walltime_limit - elapsed_time) proc_ensemble = None if time_left_for_ensembles <= 0 and self.ensemble_size > 0: raise ValueError("Could not run ensemble builder because there " @@ -734,25 +727,20 @@ def _run_ensemble(self, dataset_name=dataset.dataset_name, output_type=STRING_TO_OUTPUT_TYPES[dataset.output_type], task_type=STRING_TO_TASK_TYPES[self.task_type], - metrics=[self._metric], - opt_metric=optimize_metric, + metrics=[self._metric], opt_metric=optimize_metric, ensemble_size=self.ensemble_size, ensemble_nbest=self.ensemble_nbest, max_models_on_disc=self.max_models_on_disc, - seed=self.seed, - max_iterations=None, - read_at_most=sys.maxsize, ensemble_memory_limit=self._memory_limit, - random_state=self.seed, - precision=precision, - logger_port=self._logger_port, + seed=self.seed, max_iterations=None, random_state=self.seed, + read_at_most=sys.maxsize, precision=precision, + logger_port=self._logger_port ) self._stopwatch.stop_task(ensemble_task_name) return proc_ensemble - def _get_budget_config(self, - budget_type: Optional[str] = None, + def _get_budget_config(self, budget_type: Optional[str] = None, budget: Optional[float] = None) -> Dict[str, Union[float, str]]: budget_config: Dict[str, Union[float, str]] = {} @@ -764,13 +752,18 @@ def _get_budget_config(self, return budget_config - def _start_smac(self, proc_smac: AutoMLSMBO): + def _start_smac(self, proc_smac: AutoMLSMBO) -> None: + assert self._logger is not None + try: self.run_history, self.trajectory, budget_type = \ proc_smac.run_smbo() trajectory_filename = os.path.join( self._backend.get_smac_output_directory_for_run(self.seed), 'trajectory.json') + + assert self.trajectory is not None + saveable_trajectory = \ [list(entry[:2]) + [entry[2].get_dictionary()] + list(entry[3:]) for entry in self.trajectory] @@ -784,20 +777,17 @@ def _start_smac(self, proc_smac: AutoMLSMBO): except Exception as e: self._logger.warning(f"Could not save {trajectory_filename} due to {e}...") - def _run_smac(self, - dataset: BaseDataset, - proc_ensemble: EnsembleBuilderManager, - total_walltime_limit: int, - budget_type: Optional[str] = None, - budget: Optional[float] = None, - func_eval_time_limit_secs: Optional[int] = None, + def _run_smac(self, dataset: BaseDataset, proc_ensemble: EnsembleBuilderManager, + budget_type: Optional[str] = None, budget: Optional[float] = None, get_smac_object_callback: Optional[Callable] = None, smac_scenario_args: Optional[Dict[str, Any]] = None) -> None: + assert self._logger is not None + smac_task_name = 'runSMAC' self._stopwatch.start_task(smac_task_name) elapsed_time = self._stopwatch.wall_elapsed(self.experiment_task_name) - time_left_for_smac = max(0, total_walltime_limit - elapsed_time) + time_left_for_smac = max(0, self._total_walltime_limit - elapsed_time) self._logger.info(f"Run SMAC with {time_left_for_smac:.2f} sec time left") if time_left_for_smac <= 0: @@ -808,14 +798,12 @@ def _run_smac(self, config_space=self.search_space, dataset_name=dataset.dataset_name, backend=self._backend, - total_walltime_limit=total_walltime_limit, - func_eval_time_limit_secs=func_eval_time_limit_secs, + total_walltime_limit=self._total_walltime_limit, + func_eval_time_limit_secs=self._func_eval_time_limit_secs, dask_client=self._dask_client, memory_limit=self._memory_limit, - n_jobs=self.n_jobs, - watcher=self._stopwatch, - metric=self._metric, - seed=self.seed, + n_jobs=self.n_jobs, watcher=self._stopwatch, + metric=self._metric, seed=self.seed, include=self.include_components, exclude=self.exclude_components, disable_file_output=self._disable_file_output, @@ -833,8 +821,9 @@ def _run_smac(self, def _search_settings(self, dataset: BaseDataset, disable_file_output: List, optimize_metric: str, memory_limit: Optional[int] = 4096, - total_walltime_limit: int = 100, all_supported_metrics: bool = True - ) -> None: + func_eval_time_limit_secs: Optional[int] = None, + total_walltime_limit: int = 100, + all_supported_metrics: bool = True) -> None: """Initialise information needed for the experiment""" self.experiment_task_name = 'runSearch' @@ -847,12 +836,13 @@ def _search_settings(self, dataset: BaseDataset, disable_file_output: List, self._all_supported_metrics = all_supported_metrics self._disable_file_output = disable_file_output self._memory_limit = memory_limit - self._time_for_task = total_walltime_limit + self._total_walltime_limit = total_walltime_limit + self._func_eval_time_limit_secs = func_eval_time_limit_secs self._metric = get_metrics( names=[optimize_metric], dataset_properties=dataset_properties)[0] if self._logger is None: - self._logger = self._get_logger(self.dataset_name) + self._logger = self._get_logger(str(self.dataset_name)) # Save start time to backend self._backend.save_start_time(str(self.seed)) @@ -872,36 +862,34 @@ def _search_settings(self, dataset: BaseDataset, disable_file_output: List, else: self._is_dask_client_internally_created = False - def _adapt_time_resource_allocation(self, - total_walltime_limit: int, - func_eval_time_limit_secs: Optional[int] = None - ) -> int: + def _adapt_time_resource_allocation(self) -> None: + assert self._logger is not None # Handle time resource allocation elapsed_time = self._stopwatch.wall_elapsed(self.experiment_task_name) - time_left_for_modelfit = int(max(0, total_walltime_limit - elapsed_time)) - if func_eval_time_limit_secs is None or func_eval_time_limit_secs > time_left_for_modelfit: + time_left_for_modelfit = int(max(0, self._total_walltime_limit - elapsed_time)) + if self._func_eval_time_limit_secs is None or self._func_eval_time_limit_secs > time_left_for_modelfit: self._logger.warning( 'Time limit for a single run is higher than total time ' 'limit. Capping the limit for a single run to the total ' 'time given to SMAC (%f)' % time_left_for_modelfit ) - func_eval_time_limit_secs = time_left_for_modelfit + self._func_eval_time_limit_secs = time_left_for_modelfit # Make sure that at least 2 models are created for the ensemble process - num_models = time_left_for_modelfit // func_eval_time_limit_secs + num_models = time_left_for_modelfit // self._func_eval_time_limit_secs if num_models < 2: - func_eval_time_limit_secs = time_left_for_modelfit // 2 + self._func_eval_time_limit_secs = time_left_for_modelfit // 2 self._logger.warning( "Capping the func_eval_time_limit_secs to {} to have " "time for a least 2 models to ensemble.".format( - func_eval_time_limit_secs + self._func_eval_time_limit_secs ) ) - return func_eval_time_limit_secs - def _save_ensemble_performance_history(self, proc_ensemble: EnsembleBuilderManager) -> None: + assert self._logger is not None + if len(proc_ensemble.futures) > 0: # Also add ensemble runs that did not finish within smac time # and add them into the ensemble history @@ -920,6 +908,7 @@ def _save_ensemble_performance_history(self, proc_ensemble: EnsembleBuilderManag def _finish_experiment(self, proc_ensemble: EnsembleBuilderManager, load_models: bool) -> None: + assert self._logger is not None # Wait until the ensemble process is finished to avoid shutting down # while the ensemble builder tries to access the data self._logger.info("Start Shutdown") @@ -941,23 +930,18 @@ def _finish_experiment(self, proc_ensemble: EnsembleBuilderManager, self._logger.info("Starting to clean up the logger") self._clean_logger() - def _search( - self, - optimize_metric: str, - dataset: BaseDataset, - budget_type: Optional[str] = None, - budget: Optional[float] = None, - total_walltime_limit: int = 100, - func_eval_time_limit_secs: Optional[int] = None, - enable_traditional_pipeline: bool = True, - memory_limit: Optional[int] = 4096, - smac_scenario_args: Optional[Dict[str, Any]] = None, - get_smac_object_callback: Optional[Callable] = None, - all_supported_metrics: bool = True, - precision: int = 32, - disable_file_output: List = [], - load_models: bool = True, - ) -> 'BaseTask': + def _search(self, optimize_metric: str, + dataset: BaseDataset, budget_type: Optional[str] = None, + budget: Optional[float] = None, + total_walltime_limit: int = 100, + func_eval_time_limit_secs: Optional[int] = None, + enable_traditional_pipeline: bool = True, + memory_limit: Optional[int] = 4096, + smac_scenario_args: Optional[Dict[str, Any]] = None, + get_smac_object_callback: Optional[Callable] = None, + all_supported_metrics: bool = True, + precision: int = 32, disable_file_output: List = [], + load_models: bool = True) -> 'BaseTask': """ Search for the best pipeline configuration for the given dataset. @@ -1045,25 +1029,21 @@ def _search( self._search_settings(dataset=dataset, disable_file_output=disable_file_output, optimize_metric=optimize_metric, memory_limit=memory_limit, all_supported_metrics=all_supported_metrics, + func_eval_time_limit_secs=func_eval_time_limit_secs, total_walltime_limit=total_walltime_limit) - func_eval_time_limit_secs = self._adapt_time_resource_allocation( - total_walltime_limit=total_walltime_limit, - func_eval_time_limit_secs=func_eval_time_limit_secs - ) - + self._adapt_time_resource_allocation() self.num_run = 1 self._run_dummy_predictions() - self._run_traditional_ml(enable_traditional_pipeline=enable_traditional_pipeline, - func_eval_time_limit_secs=func_eval_time_limit_secs) + + if not enable_traditional_pipeline: + self._run_traditional_ml() + proc_ensemble = self._run_ensemble(dataset=dataset, precision=precision, - optimize_metric=optimize_metric, - total_walltime_limit=total_walltime_limit) + optimize_metric=optimize_metric) self._run_smac(budget=budget, budget_type=budget_type, proc_ensemble=proc_ensemble, - dataset=dataset, total_walltime_limit=total_walltime_limit, - func_eval_time_limit_secs=func_eval_time_limit_secs, - get_smac_object_callback=get_smac_object_callback, + dataset=dataset, get_smac_object_callback=get_smac_object_callback, smac_scenario_args=smac_scenario_args) self._finish_experiment(proc_ensemble=proc_ensemble, load_models=load_models) From eceae7080e0b16500c97344c5348935fe3e41567 Mon Sep 17 00:00:00 2001 From: nabenabe0928 Date: Thu, 15 Apr 2021 01:55:19 +0900 Subject: [PATCH 4/9] [fix] Fix a condition --- autoPyTorch/api/base_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index 5cc4f233c..0330f2ba3 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -1036,7 +1036,7 @@ def _search(self, optimize_metric: str, self.num_run = 1 self._run_dummy_predictions() - if not enable_traditional_pipeline: + if enable_traditional_pipeline: self._run_traditional_ml() proc_ensemble = self._run_ensemble(dataset=dataset, precision=precision, From 9481437e513c98a7d58200b7967c9a57cdc3c62b Mon Sep 17 00:00:00 2001 From: nabenabe0928 Date: Thu, 15 Apr 2021 05:04:44 +0900 Subject: [PATCH 5/9] [merge] Merge the latest update in refactor_developtment --- autoPyTorch/api/base_task.py | 101 ++++++++++++++++++++--------------- 1 file changed, 59 insertions(+), 42 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index 0330f2ba3..afc128625 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -25,7 +25,7 @@ import pandas as pd -from smac.runhistory.runhistory import RunHistory +from smac.runhistory.runhistory import DataOrigin, RunHistory from smac.stats.stats import Stats from smac.tae import StatusType @@ -172,11 +172,10 @@ def __init__( self.search_space: Optional[ConfigurationSpace] = None self._metric: Optional[autoPyTorchMetric] = None self._logger: Optional[PicklableClientLogger] = None - self.run_history: Optional[RunHistory] = None + self.run_history: RunHistory = RunHistory() self.trajectory: Optional[List] = None self.dataset_name: Optional[str] = None self.cv_models_: Dict = {} - self.num_run: int = 1 self.experiment_task_name: str = 'runSearch' # By default try to use the TCP logging port or get a new port @@ -492,6 +491,9 @@ def _do_dummy_prediction(self) -> None: assert self._metric is not None assert self._logger is not None + # For dummy estimator, we always expect the num_run to be 1 + num_run = 1 + self._logger.info("Starting to create dummy predictions.") memory_limit = self._memory_limit @@ -511,14 +513,14 @@ def _do_dummy_prediction(self) -> None: logger_port=self._logger_port, cost_for_crash=get_cost_of_crash(self._metric), abort_on_first_run_crash=False, - initial_num_run=self.num_run, + initial_num_run=num_run, stats=stats, memory_limit=memory_limit, disable_file_output=True if len(self._disable_file_output) > 0 else False, all_supported_metrics=self._all_supported_metrics ) - status, cost, runtime, additional_info = ta.run(self.num_run, cutoff=self._total_walltime_limit) + status, cost, runtime, additional_info = ta.run(num_run, cutoff=self._total_walltime_limit) if status == StatusType.SUCCESS: self._logger.info("Finished creating dummy predictions.") else: @@ -560,20 +562,12 @@ def _do_traditional_prediction(self, time_left: int) -> None: This method currently only supports classification. Args: - num_run: (int) - An identifier to indicate the current machine learning algorithm - being processed time_left: (int) Hard limit on how many machine learning algorithms can be fit. Depending on how fast a traditional machine learning algorithm trains, it will allow multiple models to be fitted. func_eval_time_limit_secs: (int) Maximum training time each algorithm is allowed to take, during training - - Returns: - num_run: (int) - The incremented identifier index. This depends on how many machine learning - models were fitted. """ # Mypy Checkings -- Traditional prediction is only called for search @@ -582,7 +576,10 @@ def _do_traditional_prediction(self, time_left: int) -> None: assert self._logger is not None assert self._dask_client is not None - self.num_run += 1 + self._logger.info("Starting to create traditional classifier predictions.") + + # Initialise run history for the traditional classifiers + run_history = RunHistory() memory_limit = self._memory_limit if memory_limit is not None: @@ -590,8 +587,8 @@ def _do_traditional_prediction(self, time_left: int) -> None: available_classifiers = get_available_classifiers() dask_futures = [] - total_number_classifiers = len(available_classifiers) + self.num_run - for n_r, classifier in enumerate(available_classifiers, start=self.num_run): + total_number_classifiers = len(available_classifiers) + for n_r, classifier in enumerate(available_classifiers): # Only launch a task if there is time start_time = time.time() @@ -610,7 +607,7 @@ def _do_traditional_prediction(self, time_left: int) -> None: logger_port=self._logger_port, cost_for_crash=get_cost_of_crash(self._metric), abort_on_first_run_crash=False, - initial_num_run=n_r, + initial_num_run=self._backend.get_next_num_run(), stats=stats, memory_limit=memory_limit, disable_file_output=True if len(self._disable_file_output) > 0 else False, @@ -624,9 +621,6 @@ def _do_traditional_prediction(self, time_left: int) -> None: ) ]) - # Increment the launched job index - self.num_run = n_r - # When managing time, we need to take into account the allocated time resources, # which are dependent on the number of cores. 'dask_futures' is a proxy to the number # of workers /n_jobs that we have, in that if there are 4 cores allocated, we can run at most @@ -653,6 +647,11 @@ def _do_traditional_prediction(self, time_left: int) -> None: if status == StatusType.SUCCESS: self._logger.info( f"Fitting {cls} took {runtime}s, performance:{cost}/{additional_info}") + configuration = additional_info['pipeline_configuration'] + origin = additional_info['configuration_origin'] + run_history.add(config=configuration, cost=cost, + time=runtime, status=status, seed=self.seed, + origin=origin) else: if additional_info.get('exitcode') == -6: self._logger.error( @@ -679,6 +678,13 @@ def _do_traditional_prediction(self, time_left: int) -> None: "Please consider increasing the run time to further improve performance.") break + self._logger.debug("Run history traditional: {}".format(run_history)) + # add run history of traditional to api run history + self.run_history.update(run_history, DataOrigin.EXTERNAL_SAME_INSTANCES) + run_history.save_json(os.path.join(self._backend.internals_directory, 'traditional_run_history.json'), + save_external=True) + return + def _run_dummy_predictions(self) -> None: dummy_task_name = 'runDummy' self._stopwatch.start_task(dummy_task_name) @@ -727,13 +733,17 @@ def _run_ensemble(self, dataset: BaseDataset, optimize_metric: str, dataset_name=dataset.dataset_name, output_type=STRING_TO_OUTPUT_TYPES[dataset.output_type], task_type=STRING_TO_TASK_TYPES[self.task_type], - metrics=[self._metric], opt_metric=optimize_metric, + metrics=[self._metric], + opt_metric=optimize_metric, ensemble_size=self.ensemble_size, ensemble_nbest=self.ensemble_nbest, max_models_on_disc=self.max_models_on_disc, + seed=self.seed, + max_iterations=None, + read_at_most=sys.maxsize, ensemble_memory_limit=self._memory_limit, - seed=self.seed, max_iterations=None, random_state=self.seed, - read_at_most=sys.maxsize, precision=precision, + random_state=self.seed, + precision=precision, logger_port=self._logger_port ) self._stopwatch.stop_task(ensemble_task_name) @@ -756,8 +766,9 @@ def _start_smac(self, proc_smac: AutoMLSMBO) -> None: assert self._logger is not None try: - self.run_history, self.trajectory, budget_type = \ + run_history, self.trajectory, budget_type = \ proc_smac.run_smbo() + self.run_history.update(run_history, DataOrigin.INTERNAL) trajectory_filename = os.path.join( self._backend.get_smac_output_directory_for_run(self.seed), 'trajectory.json') @@ -802,8 +813,10 @@ def _run_smac(self, dataset: BaseDataset, proc_ensemble: EnsembleBuilderManager, func_eval_time_limit_secs=self._func_eval_time_limit_secs, dask_client=self._dask_client, memory_limit=self._memory_limit, - n_jobs=self.n_jobs, watcher=self._stopwatch, - metric=self._metric, seed=self.seed, + n_jobs=self.n_jobs, + watcher=self._stopwatch, + metric=self._metric, + seed=self.seed, include=self.include_components, exclude=self.exclude_components, disable_file_output=self._disable_file_output, @@ -813,7 +826,7 @@ def _run_smac(self, dataset: BaseDataset, proc_ensemble: EnsembleBuilderManager, pipeline_config={**self.pipeline_options, **budget_config}, ensemble_callback=proc_ensemble, logger_port=self._logger_port, - start_num_run=self.num_run, + start_num_run=self._backend.get_next_num_run(peek=True), search_space_updates=self.search_space_updates ) @@ -930,18 +943,23 @@ def _finish_experiment(self, proc_ensemble: EnsembleBuilderManager, self._logger.info("Starting to clean up the logger") self._clean_logger() - def _search(self, optimize_metric: str, - dataset: BaseDataset, budget_type: Optional[str] = None, - budget: Optional[float] = None, - total_walltime_limit: int = 100, - func_eval_time_limit_secs: Optional[int] = None, - enable_traditional_pipeline: bool = True, - memory_limit: Optional[int] = 4096, - smac_scenario_args: Optional[Dict[str, Any]] = None, - get_smac_object_callback: Optional[Callable] = None, - all_supported_metrics: bool = True, - precision: int = 32, disable_file_output: List = [], - load_models: bool = True) -> 'BaseTask': + def _search( + self, + optimize_metric: str, + dataset: BaseDataset, + budget_type: Optional[str] = None, + budget: Optional[float] = None, + total_walltime_limit: int = 100, + func_eval_time_limit_secs: Optional[int] = None, + enable_traditional_pipeline: bool = True, + memory_limit: Optional[int] = 4096, + smac_scenario_args: Optional[Dict[str, Any]] = None, + get_smac_object_callback: Optional[Callable] = None, + all_supported_metrics: bool = True, + precision: int = 32, + disable_file_output: List = [], + load_models: bool = True + ) -> 'BaseTask': """ Search for the best pipeline configuration for the given dataset. @@ -1033,7 +1051,6 @@ def _search(self, optimize_metric: str, total_walltime_limit=total_walltime_limit) self._adapt_time_resource_allocation() - self.num_run = 1 self._run_dummy_predictions() if enable_traditional_pipeline: @@ -1098,7 +1115,7 @@ def refit( 'train_indices': dataset.splits[split_id][0], 'val_indices': dataset.splits[split_id][1], 'split_id': split_id, - 'num_run': 0 + 'num_run': self._backend.get_next_num_run(), }) X.update({**self.pipeline_options, **budget_config}) if self.models_ is None or len(self.models_) == 0 or self.ensemble_ is None: @@ -1175,7 +1192,7 @@ def fit(self, 'train_indices': dataset.splits[split_id][0], 'val_indices': dataset.splits[split_id][1], 'split_id': split_id, - 'num_run': 0 + 'num_run': self._backend.get_next_num_run(), }) X.update({**self.pipeline_options, **budget_config}) From b7726a88fdc43e8d65b944479fcff0a71cda9f86 Mon Sep 17 00:00:00 2001 From: nabenabe0928 Date: Thu, 15 Apr 2021 05:33:47 +0900 Subject: [PATCH 6/9] [fix] Fix mypy issues --- autoPyTorch/api/base_task.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index afc128625..427522beb 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -174,7 +174,7 @@ def __init__( self._logger: Optional[PicklableClientLogger] = None self.run_history: RunHistory = RunHistory() self.trajectory: Optional[List] = None - self.dataset_name: Optional[str] = None + self.dataset_name: str = "" self.cv_models_: Dict = {} self.experiment_task_name: str = 'runSearch' @@ -702,6 +702,7 @@ def _run_traditional_ml(self) -> None: self._stopwatch.start_task(traditional_task_name) elapsed_time = self._stopwatch.wall_elapsed(self.dataset_name) + assert self._func_eval_time_limit_secs is not None time_for_traditional = int( self._total_walltime_limit - elapsed_time - self._func_eval_time_limit_secs ) @@ -709,9 +710,10 @@ def _run_traditional_ml(self) -> None: self._stopwatch.stop_task(traditional_task_name) def _run_ensemble(self, dataset: BaseDataset, optimize_metric: str, - precision: int) -> EnsembleBuilderManager: + precision: int) -> Optional[EnsembleBuilderManager]: assert self._logger is not None + assert self._metric is not None elapsed_time = self._stopwatch.wall_elapsed(self.dataset_name) time_left_for_ensembles = max(0, self._total_walltime_limit - elapsed_time) @@ -788,7 +790,7 @@ def _start_smac(self, proc_smac: AutoMLSMBO) -> None: except Exception as e: self._logger.warning(f"Could not save {trajectory_filename} due to {e}...") - def _run_smac(self, dataset: BaseDataset, proc_ensemble: EnsembleBuilderManager, + def _run_smac(self, dataset: BaseDataset, proc_ensemble: Optional[EnsembleBuilderManager], budget_type: Optional[str] = None, budget: Optional[float] = None, get_smac_object_callback: Optional[Callable] = None, smac_scenario_args: Optional[Dict[str, Any]] = None) -> None: @@ -805,6 +807,9 @@ def _run_smac(self, dataset: BaseDataset, proc_ensemble: EnsembleBuilderManager, self._logger.warning(" Could not run SMAC because there is no time left") else: budget_config = self._get_budget_config(budget_type=budget_type, budget=budget) + + assert self._func_eval_time_limit_secs is not None + assert self._metric is not None proc_smac = AutoMLSMBO( config_space=self.search_space, dataset_name=dataset.dataset_name, @@ -1095,7 +1100,7 @@ def refit( Returns: self """ - if self.dataset_name is None: + if self.dataset_name == "": self.dataset_name = str(uuid.uuid1(clock_seq=os.getpid())) if self._logger is None: @@ -1165,7 +1170,7 @@ def fit(self, Returns: (BasePipeline): fitted pipeline """ - if self.dataset_name is None: + if self.dataset_name == "": self.dataset_name = str(uuid.uuid1(clock_seq=os.getpid())) if self._logger is None: From 60b71e05c2decf4e04f5b336da55f02ead10ee37 Mon Sep 17 00:00:00 2001 From: nabenabe0928 Date: Thu, 15 Apr 2021 05:40:25 +0900 Subject: [PATCH 7/9] [merge] Complete merge with refactor_development --- autoPyTorch/api/base_task.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index 5254d9b4c..b80499492 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -923,7 +923,7 @@ def _save_ensemble_performance_history(self, proc_ensemble: EnsembleBuilderManag pd.DataFrame(self.ensemble_performance_history).to_json( os.path.join(self._backend.internals_directory, 'ensemble_history.json')) - def _finish_experiment(self, proc_ensemble: EnsembleBuilderManager, + def _finish_experiment(self, proc_ensemble: Optional[EnsembleBuilderManager], load_models: bool) -> None: assert self._logger is not None From 432cd079dd7f5657770ae19e9cd00d5039861863 Mon Sep 17 00:00:00 2001 From: nabenabe0928 Date: Thu, 15 Apr 2021 05:42:03 +0900 Subject: [PATCH 8/9] [fix] Fix a flake8 issue --- autoPyTorch/api/base_task.py | 1 - 1 file changed, 1 deletion(-) diff --git a/autoPyTorch/api/base_task.py b/autoPyTorch/api/base_task.py index b80499492..41b20f747 100644 --- a/autoPyTorch/api/base_task.py +++ b/autoPyTorch/api/base_task.py @@ -1061,7 +1061,6 @@ def _search( if enable_traditional_pipeline: self._run_traditional_ml() - proc_ensemble = self._run_ensemble(dataset=dataset, precision=precision, optimize_metric=optimize_metric) From 91d56da0965d10d57720c3de6d396f7d74774f26 Mon Sep 17 00:00:00 2001 From: nabenabe0928 Date: Thu, 15 Apr 2021 23:30:37 +0900 Subject: [PATCH 9/9] [fix] Fix a bug from the rebase in test_api --- test/test_api/test_api.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_api/test_api.py b/test/test_api/test_api.py index 7866e7674..055ce7aec 100644 --- a/test/test_api/test_api.py +++ b/test/test_api/test_api.py @@ -407,7 +407,7 @@ def test_tabular_input_support(openml_id, backend): @pytest.mark.parametrize("fit_dictionary_tabular", ['classification_categorical_only'], indirect=True) -def test_do_dummy_prediction(dask_client, fit_dictionary_tabular): +def test_run_dummy_prediction(dask_client, fit_dictionary_tabular): backend = fit_dictionary_tabular['backend'] estimator = TabularClassificationTask( backend=backend, @@ -424,7 +424,7 @@ def test_do_dummy_prediction(dask_client, fit_dictionary_tabular): estimator._disable_file_output = [] estimator._all_supported_metrics = False - estimator._do_dummy_prediction() + estimator._run_dummy_prediction() # Ensure that the dummy predictions are not in the current working # directory, but in the temporary directory.