diff --git a/navsim/planning/script/run_create_submission_pickle.py b/navsim/planning/script/run_create_submission_pickle.py index 3b473d8..0d488aa 100644 --- a/navsim/planning/script/run_create_submission_pickle.py +++ b/navsim/planning/script/run_create_submission_pickle.py @@ -20,6 +20,26 @@ CONFIG_NAME = "default_run_create_submission_pickle" +from navsim.planning.script.builders.worker_pool_builder import build_worker +from nuplan.planning.utils.multithreading.worker_utils import worker_map +def get_traj(args): + output = [] + for arg_dict in tqdm(args, desc="Running evaluation"): + input_loader = arg_dict['input_loader'] + token = arg_dict['token'] + agent = arg_dict['agent'] + + try: + agent_input = input_loader.get_agent_input_from_token(token) + trajectory = agent.compute_trajectory(agent_input) + output.append([token, trajectory]) + + except Exception: + logger.warning(f"----------- Agent failed for token {token}:") + traceback.print_exc() + + return output + def run_test_evaluation( cfg: DictConfig, agent: AbstractAgent, @@ -44,6 +64,7 @@ def run_test_evaluation( Thus, agent.requires_scene has to be False for the agent that is to be evaluated. """ ) + logger.info("Building Agent Input Loader") input_loader = SceneLoader( data_path=data_path, @@ -55,30 +76,24 @@ def run_test_evaluation( ) agent.initialize() - # first stage output + worker = build_worker(cfg) + data_points = [{"input_loader": input_loader, "agent": agent, "token": token,} for token in input_loader.tokens_stage_one] + first_traj = worker_map(worker, get_traj, data_points) first_stage_output: Dict[str, Trajectory] = {} - for token in tqdm(input_loader.tokens_stage_one, desc="Running first stage evaluation"): - try: - agent_input = input_loader.get_agent_input_from_token(token) - trajectory = agent.compute_trajectory(agent_input) - first_stage_output.update({token: trajectory}) - except Exception: - logger.warning(f"----------- Agent failed for token {token}:") - traceback.print_exc() - # second stage output + for first_dict_list in first_traj: + first_stage_output.update({first_dict_list[0]:first_dict_list[1]}) + scene_loader_tokens_stage_two = input_loader.reactive_tokens_stage_two second_stage_output: Dict[str, Trajectory] = {} - for token in tqdm(scene_loader_tokens_stage_two, desc="Running second stage evaluation"): - try: - agent_input = input_loader.get_agent_input_from_token(token) - trajectory = agent.compute_trajectory(agent_input) - second_stage_output.update({token: trajectory}) - except Exception: - logger.warning(f"----------- Agent failed for token {token}:") - traceback.print_exc() + + data_points = [{"input_loader": input_loader, "agent": agent, "token": token,} for token in scene_loader_tokens_stage_two] + sec_traj = worker_map(worker, get_traj, data_points) + + for sec_dict_list in sec_traj: + second_stage_output.update({sec_dict_list[0]:sec_dict_list[1]}) return first_stage_output, second_stage_output diff --git a/navsim/planning/script/run_create_submission_pickle_challenge.py b/navsim/planning/script/run_create_submission_pickle_challenge.py index 311c933..98e83c8 100644 --- a/navsim/planning/script/run_create_submission_pickle_challenge.py +++ b/navsim/planning/script/run_create_submission_pickle_challenge.py @@ -19,6 +19,25 @@ CONFIG_PATH = "config/pdm_scoring" CONFIG_NAME = "default_run_create_submission_pickle" +from navsim.planning.script.builders.worker_pool_builder import build_worker +from nuplan.planning.utils.multithreading.worker_utils import worker_map +def get_traj(args): + output = [] + for arg_dict in tqdm(args, desc="Running evaluation"): + input_loader = arg_dict['input_loader'] + token = arg_dict['token'] + agent = arg_dict['agent'] + + try: + agent_input = input_loader.get_agent_input_from_token(token) + trajectory = agent.compute_trajectory(agent_input) + output.append([token, trajectory]) + + except Exception: + logger.warning(f"----------- Agent failed for token {token}:") + traceback.print_exc() + + return output def run_test_evaluation( cfg: DictConfig, @@ -56,32 +75,26 @@ def run_test_evaluation( agent.initialize() # first stage output + worker = build_worker(cfg) + data_points = [{"input_loader": input_loader, "agent": agent, "token": token,} for token in input_loader.tokens_stage_one] + first_traj = worker_map(worker, get_traj, data_points) first_stage_output: Dict[str, Trajectory] = {} - for token in tqdm(input_loader.tokens_stage_one, desc="Running first stage evaluation"): - try: - agent_input = input_loader.get_agent_input_from_token(token) - trajectory = agent.compute_trajectory(agent_input) - first_stage_output.update({token: trajectory}) - except Exception: - logger.warning(f"----------- Agent failed for token {token}:") - traceback.print_exc() - # second stage output + for first_dict_list in first_traj: + first_stage_output.update({first_dict_list[0]:first_dict_list[1]}) + # second stage output scene_loader_tokens_stage_two = input_loader.reactive_tokens_stage_two second_stage_output: Dict[str, Trajectory] = {} - for token in tqdm(scene_loader_tokens_stage_two, desc="Running second stage evaluation"): - try: - agent_input = input_loader.get_agent_input_from_token(token) - trajectory = agent.compute_trajectory(agent_input) - second_stage_output.update({token: trajectory}) - except Exception: - logger.warning(f"----------- Agent failed for token {token}:") - traceback.print_exc() - return first_stage_output, second_stage_output + data_points = [{"input_loader": input_loader, "agent": agent, "token": token,} for token in scene_loader_tokens_stage_two] + sec_traj = worker_map(worker, get_traj, data_points) + + for sec_dict_list in sec_traj: + second_stage_output.update({sec_dict_list[0]:sec_dict_list[1]}) + return first_stage_output, second_stage_output @hydra.main(config_path=CONFIG_PATH, config_name=CONFIG_NAME, version_base=None) def main(cfg: DictConfig) -> None: