diff --git a/pandaharvester/commit_timestamp.py b/pandaharvester/commit_timestamp.py index 996caf4c..dabe0c3c 100644 --- a/pandaharvester/commit_timestamp.py +++ b/pandaharvester/commit_timestamp.py @@ -1 +1 @@ -timestamp = "07-04-2026 12:44:11 on flin (by mightqxc)" +timestamp = "14-04-2026 09:20:49 on flin (by mightqxc)" diff --git a/pandaharvester/harvesterbody/worker_adjuster.py b/pandaharvester/harvesterbody/worker_adjuster.py index c4c3b838..5256374e 100644 --- a/pandaharvester/harvesterbody/worker_adjuster.py +++ b/pandaharvester/harvesterbody/worker_adjuster.py @@ -26,6 +26,7 @@ pl.Config.set_tbl_rows(-1) pl.Config.set_tbl_cols(-1) pl.Config.set_tbl_width_chars(140) +pl.Config.set_tbl_cell_numeric_alignment("RIGHT") # class to define number of workers to submit @@ -247,6 +248,299 @@ def get_activate_worker_factor(self, site_name=None, job_type=None, resource_typ tmp_log.debug(f"ret_val={ret_val}") return ret_val + # Helper methods for dataframe transformations in define_num_workers + + def _build_new_workers_df(self, static_num_workers, queue_name, tmp_log): + """Build and filter workers dataframe for a given queue.""" + return ( + self._num_workers_dict_to_df(static_num_workers) + .filter(pl.col("queue_name") == queue_name) + .filter(pl.col("resource_type").is_not_null()) + .filter(pl.col("pilot_type").is_not_null()) + .with_columns( + [ + pl.col("queue_name").fill_null(pl.lit(queue_name)), + pl.col("nQueue").fill_null(0), + pl.col("nReady").fill_null(0), + pl.col("nRunning").fill_null(0), + pl.col("nNewWorkers").fill_null(0), + ] + ) + ) + + def _build_activated_df(self, job_stats_new_df, queue_name, tmp_log): + """Build activated jobs dataframe with aggregations for ANY pilot types.""" + activated_df = ( + job_stats_new_df.filter((pl.col("computing_site") == queue_name) & (pl.col("job_status") == "activated")) + .with_columns( + pl.col("computing_site").alias("queue_name"), + pl.col("prod_source_label").map_elements(core_utils.prod_source_label_to_pilot_type, return_dtype=pl.Utf8).alias("pilot_type"), + ) + .select(["queue_name", "resource_type", "pilot_type", "n_jobs"]) + ) + # Add aggregated rows with pilot_type="ANY" (sum over all pilot_types for each resource_type) + activated_df_any_pt = ( + activated_df.select(["queue_name", "resource_type", "n_jobs"]) + .group_by(["queue_name", "resource_type"]) + .agg(pl.col("n_jobs").sum()) + .with_columns(pl.lit("ANY").alias("pilot_type")) + .select(["queue_name", "resource_type", "pilot_type", "n_jobs"]) + ) + # Add aggregated row with both resource_type="ANY" and pilot_type="ANY" (sum over all) + activated_df_any_both = ( + activated_df.select(["queue_name", "n_jobs"]) + .group_by(["queue_name"]) + .agg(pl.col("n_jobs").sum()) + .with_columns(pl.lit("ANY").alias("resource_type"), pl.lit("ANY").alias("pilot_type")) + .select(["queue_name", "resource_type", "pilot_type", "n_jobs"]) + ) + return pl.concat([activated_df, activated_df_any_pt, activated_df_any_both]) + + def _join_workers_activated_dfs(self, activated_df, tmp_new_workers_df, queue_name, tmp_log): + """Join activated jobs and workers dataframes.""" + return ( + activated_df.join( + tmp_new_workers_df, + on=["queue_name", "resource_type", "pilot_type"], + how="full", + suffix="_right", + ) + .with_columns( + [ + pl.col("queue_name").fill_null(pl.lit(queue_name)), + pl.col("job_type").fill_null(DEFAULT_JOB_TYPE), + pl.coalesce(pl.col("resource_type"), pl.col("resource_type_right")).fill_null(pl.lit("ANY")).alias("resource_type"), + pl.coalesce(pl.col("pilot_type"), pl.col("pilot_type_right")).fill_null(pl.lit("ANY")).alias("pilot_type"), + pl.col("nQueue").fill_null(0), + pl.col("nReady").fill_null(0), + pl.col("nRunning").fill_null(0), + pl.col("nNewWorkers").fill_null(0), + pl.col("n_jobs").fill_null(0), + ] + ) + .select(pl.all().exclude(["resource_type_right", "pilot_type_right"])) + ) + + def _build_master_df(self, joined_df, queue_name, static_num_workers, tmp_log): + """Build master dataframe with grouping, default rows, and sorting.""" + tmp_master_df = joined_df.group_by(["queue_name", "job_type", "resource_type", "pilot_type"]).agg( + pl.col("nQueue").max(), + pl.col("nReady").max(), + pl.col("nRunning").max(), + pl.col("nNewWorkers").max(), + pl.col("n_jobs").sum().alias("n_activated_jobs"), + ) + + # Ensure DEFAULT_PILOT_TYPE exists in static_num_workers and master_df for all (job_type, resource_type) pairs + required_default_rows = [] + for job_type in static_num_workers[queue_name]: + for resource_type in static_num_workers[queue_name][job_type]: + static_num_workers[queue_name][job_type][resource_type].setdefault( + DEFAULT_PILOT_TYPE, {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0} + ) + required_default_rows.append( + { + "queue_name": queue_name, + "job_type": job_type, + "resource_type": resource_type, + } + ) + + if required_default_rows: + required_default_df = pl.DataFrame(required_default_rows) + existing_default_df = ( + tmp_master_df.filter((pl.col("queue_name") == queue_name) & (pl.col("pilot_type") == DEFAULT_PILOT_TYPE)) + .select(["queue_name", "job_type", "resource_type"]) + .unique() + ) + missing_default_df = required_default_df.join( + existing_default_df, + on=["queue_name", "job_type", "resource_type"], + how="anti", + ) + if missing_default_df.height > 0: + missing_default_df = missing_default_df.with_columns( + [ + pl.lit(DEFAULT_PILOT_TYPE).alias("pilot_type"), + pl.lit(0, dtype=pl.Int64).alias("nQueue"), + pl.lit(0, dtype=pl.Int64).alias("nReady"), + pl.lit(0, dtype=pl.Int64).alias("nRunning"), + pl.lit(0, dtype=pl.Int64).alias("nNewWorkers"), + pl.lit(0, dtype=pl.Int64).alias("n_activated_jobs"), + ] + ).select( + [ + "queue_name", + "job_type", + "resource_type", + "pilot_type", + "nQueue", + "nReady", + "nRunning", + "nNewWorkers", + "n_activated_jobs", + ] + ) + tmp_master_df = pl.concat([tmp_master_df, missing_default_df]) + + # Sort master_df to have consistent order and prioritized pilot types on top + return tmp_master_df.sort( + [ + "queue_name", + pl.when(pl.col("job_type") == "ANY").then(1).otherwise(0), + "job_type", + pl.when(pl.col("resource_type") == "ANY").then(1).otherwise(0), + "resource_type", + pl.when(pl.col("pilot_type") == "ANY").then(2).when(pl.col("pilot_type") == DEFAULT_PILOT_TYPE).then(0).otherwise(1), + "pilot_type", + ] + ) + + def _sync_master_df_to_static_workers(self, tmp_master_df, queue_name, tmp_static_num_workers, tmp_log): + """Update tmp_static_num_workers dict from master dataframe.""" + for row in tmp_master_df.iter_rows(named=True): + queue_name_from_row = row["queue_name"] + job_type = row["job_type"] + resource_type = row["resource_type"] + pilot_type = row["pilot_type"] + # create missing keys in nested dictionary + if queue_name_from_row not in tmp_static_num_workers: + tmp_static_num_workers[queue_name_from_row] = {} + if job_type not in tmp_static_num_workers[queue_name_from_row]: + tmp_static_num_workers[queue_name_from_row][job_type] = {} + if resource_type not in tmp_static_num_workers[queue_name_from_row][job_type]: + tmp_static_num_workers[queue_name_from_row][job_type][resource_type] = {} + if pilot_type not in tmp_static_num_workers[queue_name_from_row][job_type][resource_type]: + tmp_static_num_workers[queue_name_from_row][job_type][resource_type][pilot_type] = {} + # update values + tmp_static_num_workers[queue_name_from_row][job_type][resource_type][pilot_type].update( + { + "nQueue": row["nQueue"], + "nReady": row["nReady"], + "nRunning": row["nRunning"], + "nNewWorkers": row["nNewWorkers"], + } + ) + + def _set_initial_new_workers( + self, tmp_master_df, tmp_static_num_workers, static_num_workers, queue_name, master_df, queue_dict, queue_config, prioritized_pilot_types, tmp_log + ): + """Set initial nNewWorkers for pilot types based on activated jobs and activation factor.""" + for job_type in tmp_static_num_workers[queue_name]: + for resource_type, pilot_type_dict in tmp_static_num_workers[queue_name][job_type].items(): + total_n_new_workers = pilot_type_dict["ANY"]["nNewWorkers"] + if total_n_new_workers <= 0: + continue + # calculate the total number of new workers needed for prioritized pilot types + remaining_n_new_workers = total_n_new_workers + activate_worker_factor = self.get_activate_worker_factor(queue_name, job_type, resource_type, queue_dict, queue_config) + prio_ptype_result = tmp_master_df.filter( + (pl.col("queue_name") == queue_name) + & (pl.col("job_type") == job_type) + & (pl.col("resource_type") == resource_type) + & (pl.col("pilot_type").is_in(prioritized_pilot_types)) + ).select([pl.col("n_activated_jobs").sum(), pl.col("nQueue").sum()]) + if prio_ptype_result.shape[0] > 0: + total_prio_ptype_n_activated_jobs, total_prio_ptype_nQueue = prio_ptype_result.row(0) + else: + total_prio_ptype_n_activated_jobs, total_prio_ptype_nQueue = 0, 0 + total_prio_ptype_calculated_n_new_workers = max(int(total_prio_ptype_n_activated_jobs * activate_worker_factor) - total_prio_ptype_nQueue, 0) + if total_prio_ptype_calculated_n_new_workers > 0: + adjust_ratio = min(total_n_new_workers / total_prio_ptype_calculated_n_new_workers, 1) + for pilot_type, tmp_val in pilot_type_dict.items(): + if pilot_type in prioritized_pilot_types: + pt_result = tmp_master_df.filter( + (pl.col("queue_name") == queue_name) + & (pl.col("job_type") == job_type) + & (pl.col("resource_type") == resource_type) + & (pl.col("pilot_type") == pilot_type) + ).select([pl.col("n_activated_jobs"), pl.col("nQueue")]) + if pt_result.shape[0] > 0: + n_activated_jobs, nQueue = pt_result.row(0) + else: + n_activated_jobs, nQueue = 0, 0 + calculated_n_new_workers = int(max(int(n_activated_jobs * activate_worker_factor) - nQueue, 0) * adjust_ratio) + if calculated_n_new_workers <= 0: + continue + tmp_static_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] = calculated_n_new_workers + static_num_workers[queue_name].setdefault(job_type, {}).setdefault(resource_type, {}).setdefault( + pilot_type, {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0} + )["nNewWorkers"] = calculated_n_new_workers + remaining_n_new_workers -= calculated_n_new_workers + master_df = master_df.with_columns( + pl.when( + (pl.col("queue_name") == queue_name) + & (pl.col("job_type") == job_type) + & (pl.col("resource_type") == resource_type) + & (pl.col("pilot_type") == pilot_type) + ) + .then(pl.lit(calculated_n_new_workers)) + .otherwise(pl.col("nNewWorkers")) + .alias("nNewWorkers") + ) + tmp_log.debug( + f"Set initial nNewWorkers to {calculated_n_new_workers} for queue={queue_name} job_type={job_type} resource_type={resource_type} pilot_type={pilot_type}" + ) + if remaining_n_new_workers > 0: + # allocate remaining n_new_workers to DEFAULT_PILOT_TYPE PR + tmp_static_num_workers[queue_name][job_type][resource_type][DEFAULT_PILOT_TYPE]["nNewWorkers"] += remaining_n_new_workers + static_num_workers[queue_name].setdefault(job_type, {}).setdefault(resource_type, {}).setdefault( + DEFAULT_PILOT_TYPE, {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0} + )["nNewWorkers"] = tmp_static_num_workers[queue_name][job_type][resource_type][DEFAULT_PILOT_TYPE]["nNewWorkers"] + tmp_log.debug( + f"Set remaining nNewWorkers to {remaining_n_new_workers} for queue={queue_name} job_type={job_type} resource_type={resource_type} pilot_type={DEFAULT_PILOT_TYPE}" + ) + master_df = master_df.with_columns( + pl.when( + (pl.col("queue_name") == queue_name) + & (pl.col("job_type") == job_type) + & (pl.col("resource_type") == resource_type) + & (pl.col("pilot_type") == DEFAULT_PILOT_TYPE) + ) + .then(pl.lit(remaining_n_new_workers)) + .otherwise(pl.col("nNewWorkers")) + .alias("nNewWorkers") + ) + return master_df + + def _format_result_dataframe(self, dyn_num_workers, queue_name, tmp_log): + """Format result dataframe for logging.""" + dyn_num_workers_rows = [] + for qn, job_types in dyn_num_workers.items(): + for job_type, resource_types in job_types.items(): + for resource_type, pilot_types in resource_types.items(): + for pilot_type, worker_data in pilot_types.items(): + dyn_num_workers_rows.append( + { + "queue_name": qn, + "job_type": job_type, + "resource_type": resource_type, + "pilot_type": pilot_type, + "nQueue": worker_data.get("nQueue", 0), + "nReady": worker_data.get("nReady", 0), + "nRunning": worker_data.get("nRunning", 0), + "nNewWorkers": worker_data.get("nNewWorkers", 0), + } + ) + if dyn_num_workers_rows: + result_df = ( + pl.DataFrame(dyn_num_workers_rows) + .select(pl.all().exclude(["queue_name"])) + .sort( + [ + pl.when(pl.col("job_type") == "ANY").then(1).otherwise(0), + "job_type", + pl.when(pl.col("resource_type") == "ANY").then(1).otherwise(0), + "resource_type", + pl.when(pl.col("pilot_type") == "ANY").then(2).when(pl.col("pilot_type") == DEFAULT_PILOT_TYPE).then(0).otherwise(1), + "pilot_type", + ] + ) + ) + tmp_log.debug(f"result_df:\n{result_df}") + else: + tmp_log.debug("result_df: nothing to display") + # define number of workers to submit based on various information def define_num_workers(self, static_num_workers, site_name) -> dict | None: """ @@ -334,217 +628,21 @@ def _normalize_job_type_any(queue_dict): prioritized_pilot_types = [core_utils.prod_source_label_to_pilot_type(label) for label in prioritized_pslabels] - tmp_new_workers_df = ( - self._num_workers_dict_to_df(static_num_workers) - .filter(pl.col("queue_name") == queue_name) - .filter(pl.col("resource_type").is_not_null()) - .filter(pl.col("pilot_type").is_not_null()) - .with_columns( - [ - pl.col("queue_name").fill_null(pl.lit(queue_name)), - pl.col("nQueue").fill_null(0), - pl.col("nReady").fill_null(0), - pl.col("nRunning").fill_null(0), - pl.col("nNewWorkers").fill_null(0), - ] - ) - ) - # tmp_log.debug(f"DEBUG: tmp_new_workers_df after filter shape: {tmp_new_workers_df.shape}") - # tmp_log.debug(f"DEBUG: tmp_new_workers_df columns: {tmp_new_workers_df.columns}") - # tmp_log.debug(f"DEBUG: tmp_new_workers_df:\n{tmp_new_workers_df}") - - activated_df = ( - job_stats_new_df.filter((pl.col("computing_site") == queue_name) & (pl.col("job_status") == "activated")) - .with_columns( - pl.col("computing_site").alias("queue_name"), - pl.col("prod_source_label").map_elements(core_utils.prod_source_label_to_pilot_type, return_dtype=pl.Utf8).alias("pilot_type"), - ) - .select(["queue_name", "resource_type", "pilot_type", "n_jobs"]) - ) - # Add aggregated rows with pilot_type="ANY" (sum over all pilot_types for each resource_type) - activated_df_any_pt = ( - activated_df.select(["queue_name", "resource_type", "n_jobs"]) - .group_by(["queue_name", "resource_type"]) - .agg(pl.col("n_jobs").sum()) - .with_columns(pl.lit("ANY").alias("pilot_type")) - .select(["queue_name", "resource_type", "pilot_type", "n_jobs"]) - ) - # Add aggregated row with both resource_type="ANY" and pilot_type="ANY" (sum over all) - activated_df_any_both = ( - activated_df.select(["queue_name", "n_jobs"]) - .group_by(["queue_name"]) - .agg(pl.col("n_jobs").sum()) - .with_columns(pl.lit("ANY").alias("resource_type"), pl.lit("ANY").alias("pilot_type")) - .select(["queue_name", "resource_type", "pilot_type", "n_jobs"]) - ) + tmp_new_workers_df = self._build_new_workers_df(static_num_workers, queue_name, tmp_log) + activated_df = self._build_activated_df(job_stats_new_df, queue_name, tmp_log) + joined_df = self._join_workers_activated_dfs(activated_df, tmp_new_workers_df, queue_name, tmp_log) - activated_df = pl.concat([activated_df, activated_df_any_pt, activated_df_any_both]) - # tmp_log.debug(f"DEBUG: activated_df after filter shape: {activated_df.shape}") - # tmp_log.debug(f"DEBUG: activated_df columns: {activated_df.columns}") - # tmp_log.debug(f"DEBUG: activated_df:\n{activated_df}") - - joined_df = ( - activated_df.join( - tmp_new_workers_df, - on=["queue_name", "resource_type", "pilot_type"], - how="full", - suffix="_right", - ) - .with_columns( - [ - pl.col("queue_name").fill_null(pl.lit(queue_name)), - pl.col("job_type").fill_null(DEFAULT_JOB_TYPE), - # Use coalesce to prefer left side if not null, otherwise use right side - pl.coalesce(pl.col("resource_type"), pl.col("resource_type_right")).fill_null(pl.lit("ANY")).alias("resource_type"), - pl.coalesce(pl.col("pilot_type"), pl.col("pilot_type_right")).fill_null(pl.lit("ANY")).alias("pilot_type"), - pl.col("nQueue").fill_null(0), - pl.col("nReady").fill_null(0), - pl.col("nRunning").fill_null(0), - pl.col("nNewWorkers").fill_null(0), - pl.col("n_jobs").fill_null(0), - ] - ) - .select( - # Drop the temporary *_right columns after coalesce - pl.all().exclude(["resource_type_right", "pilot_type_right"]) - ) - ) - # tmp_log.debug(f"DEBUG: joined_df shape: {joined_df.shape}") - # tmp_log.debug(f"DEBUG: joined_df columns: {joined_df.columns}") - # tmp_log.debug(f"DEBUG: joined_df:\n{joined_df}") - - tmp_master_df = ( - joined_df.group_by(["queue_name", "job_type", "resource_type", "pilot_type"]) - .agg( - pl.col("nQueue").max(), - pl.col("nReady").max(), - pl.col("nRunning").max(), - pl.col("nNewWorkers").max(), - pl.col("n_jobs").sum().alias("n_activated_jobs"), - ) - .sort( - [ - "queue_name", - pl.when(pl.col("job_type") == "ANY").then(1).otherwise(0), - "job_type", - pl.when(pl.col("resource_type") == "ANY").then(1).otherwise(0), - "resource_type", - pl.when(pl.col("pilot_type") == "ANY").then(2).when(pl.col("pilot_type") == DEFAULT_PILOT_TYPE).then(0).otherwise(1), - "pilot_type", - ] - ) - ) - # tmp_log.debug(f"master_df: \n{tmp_master_df}") + tmp_master_df = self._build_master_df(joined_df, queue_name, static_num_workers, tmp_log) master_df = tmp_master_df.clone() - tmp_static_num_workers = copy.deepcopy(static_num_workers) # update tmp_static_num_workers with tmp_master_df - for row in tmp_master_df.iter_rows(named=True): - queue_name_from_row = row["queue_name"] - job_type = row["job_type"] - resource_type = row["resource_type"] - pilot_type = row["pilot_type"] - # create missing keys in nested dictionary - if queue_name_from_row not in tmp_static_num_workers: - tmp_static_num_workers[queue_name_from_row] = {} - if job_type not in tmp_static_num_workers[queue_name_from_row]: - tmp_static_num_workers[queue_name_from_row][job_type] = {} - if resource_type not in tmp_static_num_workers[queue_name_from_row][job_type]: - tmp_static_num_workers[queue_name_from_row][job_type][resource_type] = {} - if pilot_type not in tmp_static_num_workers[queue_name_from_row][job_type][resource_type]: - tmp_static_num_workers[queue_name_from_row][job_type][resource_type][pilot_type] = {} - # update values - tmp_static_num_workers[queue_name_from_row][job_type][resource_type][pilot_type].update( - { - "nQueue": row["nQueue"], - "nReady": row["nReady"], - "nRunning": row["nRunning"], - "nNewWorkers": row["nNewWorkers"], - } - ) + self._sync_master_df_to_static_workers(tmp_master_df, queue_name, tmp_static_num_workers, tmp_log) # set initial nNewWorkers for pilot types based on number of activated jobs and the activate worker factor - for job_type in tmp_static_num_workers[queue_name]: - for resource_type, pilot_type_dict in tmp_static_num_workers[queue_name][job_type].items(): - total_n_new_workers = pilot_type_dict["ANY"]["nNewWorkers"] - if total_n_new_workers <= 0: - continue - # calculate the total number of new workers needed for prioritized pilot types - remaining_n_new_workers = total_n_new_workers - activate_worker_factor = self.get_activate_worker_factor(queue_name, job_type, resource_type, queue_dict, queue_config) - prio_ptype_result = tmp_master_df.filter( - (pl.col("queue_name") == queue_name) - & (pl.col("job_type") == job_type) - & (pl.col("resource_type") == resource_type) - & (pl.col("pilot_type").is_in(prioritized_pilot_types)) - ).select([pl.col("n_activated_jobs").sum(), pl.col("nQueue").sum()]) - if prio_ptype_result.shape[0] > 0: - total_prio_ptype_n_activated_jobs, total_prio_ptype_nQueue = prio_ptype_result.row(0) - else: - total_prio_ptype_n_activated_jobs, total_prio_ptype_nQueue = 0, 0 - total_prio_ptype_calculated_n_new_workers = max( - int(total_prio_ptype_n_activated_jobs * activate_worker_factor) - total_prio_ptype_nQueue, 0 - ) - if total_prio_ptype_calculated_n_new_workers > 0: - adjust_ratio = min(total_n_new_workers / total_prio_ptype_calculated_n_new_workers, 1) - for pilot_type, tmp_val in pilot_type_dict.items(): - if pilot_type in prioritized_pilot_types: - pt_result = tmp_master_df.filter( - (pl.col("queue_name") == queue_name) - & (pl.col("job_type") == job_type) - & (pl.col("resource_type") == resource_type) - & (pl.col("pilot_type") == pilot_type) - ).select([pl.col("n_activated_jobs"), pl.col("nQueue")]) - if pt_result.shape[0] > 0: - n_activated_jobs, nQueue = pt_result.row(0) - else: - n_activated_jobs, nQueue = 0, 0 - calculated_n_new_workers = int(max(int(n_activated_jobs * activate_worker_factor) - nQueue, 0) * adjust_ratio) - if calculated_n_new_workers <= 0: - continue - tmp_static_num_workers[queue_name][job_type][resource_type][pilot_type]["nNewWorkers"] = calculated_n_new_workers - static_num_workers[queue_name].setdefault(job_type, {}).setdefault(resource_type, {}).setdefault( - pilot_type, {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0} - )["nNewWorkers"] = calculated_n_new_workers - remaining_n_new_workers -= calculated_n_new_workers - master_df = master_df.with_columns( - pl.when( - (pl.col("queue_name") == queue_name) - & (pl.col("job_type") == job_type) - & (pl.col("resource_type") == resource_type) - & (pl.col("pilot_type") == pilot_type) - ) - .then(pl.lit(calculated_n_new_workers)) - .otherwise(pl.col("nNewWorkers")) - .alias("nNewWorkers") - ) - tmp_log.debug( - f"Set initial nNewWorkers to {calculated_n_new_workers} for queue={queue_name} job_type={job_type} resource_type={resource_type} pilot_type={pilot_type}" - ) - if remaining_n_new_workers > 0: - # add remaining n_new_workers to DEFAULT_PILOT_TYPE PR - tmp_static_num_workers[queue_name][job_type][resource_type].setdefault( - DEFAULT_PILOT_TYPE, {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0} - ) - tmp_static_num_workers[queue_name][job_type][resource_type][DEFAULT_PILOT_TYPE]["nNewWorkers"] += remaining_n_new_workers - static_num_workers[queue_name].setdefault(job_type, {}).setdefault(resource_type, {}).setdefault( - DEFAULT_PILOT_TYPE, {"nReady": 0, "nRunning": 0, "nQueue": 0, "nNewWorkers": 0} - )["nNewWorkers"] = tmp_static_num_workers[queue_name][job_type][resource_type][DEFAULT_PILOT_TYPE]["nNewWorkers"] - tmp_log.debug( - f"Set remaining nNewWorkers to {remaining_n_new_workers} for queue={queue_name} job_type={job_type} resource_type={resource_type} pilot_type={DEFAULT_PILOT_TYPE}" - ) - master_df = master_df.with_columns( - pl.when( - (pl.col("queue_name") == queue_name) - & (pl.col("job_type") == job_type) - & (pl.col("resource_type") == resource_type) - & (pl.col("pilot_type") == DEFAULT_PILOT_TYPE) - ) - .then(pl.lit(remaining_n_new_workers)) - .otherwise(pl.col("nNewWorkers")) - .alias("nNewWorkers") - ) + master_df = self._set_initial_new_workers( + tmp_master_df, tmp_static_num_workers, static_num_workers, queue_name, master_df, queue_dict, queue_config, prioritized_pilot_types, tmp_log + ) display_master_df = master_df.select( ["job_type", "resource_type", "pilot_type", "nQueue", "nReady", "nRunning", "nNewWorkers", "n_activated_jobs"] ) @@ -759,6 +857,7 @@ def _normalize_job_type_any(queue_dict): tmp_log.debug(ret_msg) else: max_new_workers_per_cycle = queue_config.maxNewWorkersPerCycle + if len(dyn_num_workers[queue_name]) > 1: total_new_workers_rts = 0 for _jt in dyn_num_workers[queue_name]: @@ -766,6 +865,7 @@ def _normalize_job_type_any(queue_dict): if _jt != "ANY" and _rt != "ANY": for _pt in dyn_num_workers[queue_name][_jt][_rt]: total_new_workers_rts = total_new_workers_rts + dyn_num_workers[queue_name][_jt][_rt][_pt]["nNewWorkers"] + n_new_workers_max_agg = min(max(n_queue_limit - n_queue_total, 0), max(max_workers - n_queue_total - n_ready_total - n_running_total, 0)) if max_new_workers_per_cycle >= 0: n_new_workers_max_agg = min(n_new_workers_max_agg, max_new_workers_per_cycle) @@ -830,41 +930,7 @@ def _normalize_job_type_any(queue_dict): # dump tmp_log.debug(f"defined {str(dyn_num_workers)}") # print result in table - dyn_num_workers_rows = [] - for queue_name, job_types in dyn_num_workers.items(): - for job_type, resource_types in job_types.items(): - for resource_type, pilot_types in resource_types.items(): - for pilot_type, worker_data in pilot_types.items(): - dyn_num_workers_rows.append( - { - "queue_name": queue_name, - "job_type": job_type, - "resource_type": resource_type, - "pilot_type": pilot_type, - "nQueue": worker_data.get("nQueue", 0), - "nReady": worker_data.get("nReady", 0), - "nRunning": worker_data.get("nRunning", 0), - "nNewWorkers": worker_data.get("nNewWorkers", 0), - } - ) - if dyn_num_workers_rows: - result_df = ( - pl.DataFrame(dyn_num_workers_rows) - .select(pl.all().exclude(["queue_name"])) - .sort( - [ - pl.when(pl.col("job_type") == "ANY").then(1).otherwise(0), - "job_type", - pl.when(pl.col("resource_type") == "ANY").then(1).otherwise(0), - "resource_type", - pl.when(pl.col("pilot_type") == "ANY").then(2).when(pl.col("pilot_type") == DEFAULT_PILOT_TYPE).then(0).otherwise(1), - "pilot_type", - ] - ) - ) - tmp_log.debug(f"result_df:\n{result_df}") - else: - tmp_log.debug("result_df: nothing to display") + self._format_result_dataframe(dyn_num_workers, queue_name, tmp_log) return dyn_num_workers except Exception: # dump error diff --git a/pandaharvester/panda_pkg_info.py b/pandaharvester/panda_pkg_info.py index 504f8628..5b95e65a 100644 --- a/pandaharvester/panda_pkg_info.py +++ b/pandaharvester/panda_pkg_info.py @@ -1 +1 @@ -release_version = "0.7.4" +release_version = "0.7.5"