diff --git a/src/cloudai/systems/slurm/strategy/slurm_command_gen_strategy.py b/src/cloudai/systems/slurm/strategy/slurm_command_gen_strategy.py index dc8a80303..639a703b4 100644 --- a/src/cloudai/systems/slurm/strategy/slurm_command_gen_strategy.py +++ b/src/cloudai/systems/slurm/strategy/slurm_command_gen_strategy.py @@ -316,6 +316,18 @@ def _metadata_cmd(self, slurm_args: dict[str, Any], tr: TestRun) -> str: ] ) + def _enable_vboost_cmd(self, slurm_args: dict[str, Any], tr: TestRun) -> str: + return " ".join( + [ + *self.gen_srun_prefix(slurm_args, tr), + f"--output={tr.output_path.absolute() / 'vboost.out'}", + f"--error={tr.output_path.absolute() / 'vboost.err'}", + "bash", + "-c", + '"sudo nvidia-smi boost-slider --vboost 1"', + ] + ) + def _write_sbatch_script( self, slurm_args: Dict[str, Any], env_vars: Dict[str, Union[str, List[str]]], srun_command: str, tr: TestRun ) -> str: @@ -341,6 +353,8 @@ def _write_sbatch_script( batch_script_content.extend([self._format_env_vars(env_vars)]) + if env_vars.get("ENABLE_VBOOST") == "1": + batch_script_content.extend([self._enable_vboost_cmd(slurm_args, tr), ""]) batch_script_content.extend([self._ranks_mapping_cmd(slurm_args, tr), ""]) batch_script_content.extend([self._metadata_cmd(slurm_args, tr), ""]) diff --git a/src/cloudai/workloads/nemo_run/cloudai_nemorun.py b/src/cloudai/workloads/nemo_run/cloudai_nemorun.py index 0a119c602..789cea21a 100644 --- a/src/cloudai/workloads/nemo_run/cloudai_nemorun.py +++ b/src/cloudai/workloads/nemo_run/cloudai_nemorun.py @@ -15,34 +15,137 @@ # limitations under the License. import os +from datetime import timedelta +from typing import Optional import lightning.pytorch as pl import nemo_run as run import torch +from lightning.pytorch.loggers import TensorBoardLogger +from lightning.pytorch.loggers.wandb import WandbLogger from megatron.core.distributed import DistributedDataParallelConfig from megatron.core.optimizer import OptimizerConfig from nemo import lightning as nl from nemo.collections import llm from nemo.collections.common.tokenizers.huggingface import AutoTokenizer +from nemo.collections.common.tokenizers.tokenizer_spec import TokenizerSpec from nemo.collections.llm.gpt.data.mock import MockDataModule -from nemo.collections.llm.gpt.data.packed_sequence import PackedSequenceSpecs -from nemo.collections.llm.gpt.data.squad import SquadDataModule -from nemo.collections.llm.gpt.model.nemotron import ( - Nemotron4Config15B, - NemotronModel, +from nemo.collections.llm.gpt.model.llama import Llama3Config8B, Llama3Config70B, Llama31Config405B, LlamaModel +from nemo.collections.llm.gpt.model.nemotron import Nemotron4Config15B, Nemotron4Config340B, NemotronModel +from nemo.collections.llm.recipes.nemotron3_8b import pretrain_recipe as nemotron3_8b_recipe +from nemo.collections.llm.recipes.tp_overlap_configs.userbuffers import ( + BulkOverlapCfg, + PipelineOverlapCfg, + RingExchangeOverlapCfg, + TransformerLayerTPOverlapCfg, ) from nemo.collections.nlp.modules.common.tokenizer_utils import get_nmt_tokenizer +from nemo.lightning import AutoResume, NeMoLogger +from nemo.lightning.pytorch.callbacks import ModelCheckpoint from nemo.lightning.pytorch.callbacks.garbage_collection import GarbageCollectionCallback from nemo.lightning.pytorch.callbacks.megatron_comm_overlap import MegatronCommOverlapCallback from nemo.lightning.pytorch.callbacks.nsys import NsysCallback +from nemo.lightning.pytorch.optim import CosineAnnealingScheduler from nemo.utils.exp_manager import TimingCallback +@run.cli.factory(is_target_default=True) +def default_log( + dir: Optional[str] = None, + name: str = "default", + tensorboard_logger: Optional[run.Config[TensorBoardLogger]] = None, + wandb_logger: Optional[run.Config[WandbLogger]] = None, +) -> run.Config[NeMoLogger]: + ckpt = run.Config( + ModelCheckpoint, + save_last=False, + save_top_k=10, + train_time_interval=run.Config(timedelta, minutes=15), + filename="{model_name}--{val_loss:.2f}-{step}-{consumed_samples}", + ) + + # Default TensorBoard logger if not provided + if tensorboard_logger is None: + tensorboard_logger = run.Config(TensorBoardLogger, save_dir="tb_logs", name=name) + + return run.Config( + NeMoLogger, + ckpt=ckpt, + name=name, + tensorboard=tensorboard_logger, + wandb=wandb_logger, + log_dir=dir, + ) + + +@run.cli.factory(is_target_default=True) +def default_resume(resume_if_exists=True, resume_ignore_no_checkpoint=True) -> run.Config[AutoResume]: + return run.Config( + AutoResume, + resume_if_exists=resume_if_exists, + resume_ignore_no_checkpoint=resume_ignore_no_checkpoint, + ) + + @run.cli.factory @run.autoconvert -def hf_tokenizer() -> run.Config[AutoTokenizer]: +def hf_tokenizer_llama3_8b() -> run.Config[AutoTokenizer]: model_name = "meta-llama/Meta-Llama-3-8B" + return run.Config( + AutoTokenizer, + pretrained_model_name=model_name, + use_fast=True, + ) + + +@run.cli.factory +@run.autoconvert +def hf_tokenizer_llama3_70b() -> run.Config[AutoTokenizer]: + model_name = "meta-llama/Meta-Llama-3-70B" + return run.Config( + AutoTokenizer, + pretrained_model_name=model_name, + use_fast=True, + ) + + +@run.cli.factory +@run.autoconvert +def hf_tokenizer_llama3_405b() -> run.Config[AutoTokenizer]: + model_name = "meta-llama/Llama-3.1-405B" + return run.Config( + AutoTokenizer, + pretrained_model_name=model_name, + use_fast=True, + ) + + +@run.cli.factory +@run.autoconvert +def hf_tokenizer_nemotron3_8b() -> run.Config[AutoTokenizer]: + model_name = "nvidia/nemotron-3-8b" + return run.Config( + AutoTokenizer, + pretrained_model_name=model_name, + use_fast=True, + ) + + +@run.cli.factory +@run.autoconvert +def hf_tokenizer_nemotron4_15b() -> run.Config[AutoTokenizer]: + model_name = "nvidia/nemotron-4-15b" + return run.Config( + AutoTokenizer, + pretrained_model_name=model_name, + use_fast=True, + ) + +@run.cli.factory +@run.autoconvert +def hf_tokenizer_nemotron4_340b() -> run.Config[AutoTokenizer]: + model_name = "nvidia/nemotron-4-340b" return run.Config( AutoTokenizer, pretrained_model_name=model_name, @@ -50,10 +153,10 @@ def hf_tokenizer() -> run.Config[AutoTokenizer]: ) -@run.cli.factory(target=MockDataModule, target_arg="tokenizer") +@run.cli.factory(target=TokenizerSpec) @run.autoconvert -def null_tokenizer() -> run.Config[AutoTokenizer]: - return run.Config(get_nmt_tokenizer, library="null", model_name="NullTokenizer", vocab_size=256000) +def null_tokenizer(vocab_size: int = 256000) -> run.Config[TokenizerSpec]: + return run.Config(get_nmt_tokenizer, library="null", model_name="NullTokenizer", vocab_size=vocab_size) @run.cli.factory @@ -85,7 +188,7 @@ def nsys_callbacks() -> list[pl.Callback]: @run.cli.factory @run.autoconvert -def comms_overlap_callbacks_lora() -> list[pl.Callback]: +def comms_overlap_callbacks() -> list[pl.Callback]: return [ timing_callback(), run.Config(MegatronCommOverlapCallback, tp_comm_overlap=False), @@ -94,57 +197,404 @@ def comms_overlap_callbacks_lora() -> list[pl.Callback]: @run.cli.factory @run.autoconvert -def comms_overlap_callbacks_pretrain() -> list[pl.Callback]: - return [ - timing_callback(), - run.Config(MegatronCommOverlapCallback, overlap_param_gather_with_optimizer_step=False), - ] +def llama3_70b_bf16_tp_overlap_config() -> run.Config[TransformerLayerTPOverlapCfg]: + return run.Config( + TransformerLayerTPOverlapCfg, + qkv_dgrad=run.Config( + BulkOverlapCfg, + cga_size=2, + method="bulk", + num_sm=4, + set_sm_margin=False, + ), + qkv_wgrad=run.Config( + BulkOverlapCfg, + cga_size=2, + method="bulk", + num_sm=24, + set_sm_margin=False, + ), + qkv_fprop=run.Config( + RingExchangeOverlapCfg, + aggregate=False, + method="ring_exchange", + num_sm=1, + set_sm_margin=False, + ), + proj_dgrad=run.Config( + RingExchangeOverlapCfg, + aggregate=False, + method="ring_exchange", + num_sm=1, + set_sm_margin=False, + ), + proj_fprop=run.Config( + PipelineOverlapCfg, + num_sm=24, + cga_size=2, + num_splits=4, + set_sm_margin=True, + fp8_buf=False, + ), + fc1_dgrad=run.Config( + BulkOverlapCfg, + num_sm=2, + cga_size=2, + set_sm_margin=False, + method="bulk", + ), + fc1_wgrad=run.Config( + BulkOverlapCfg, + num_sm=4, + cga_size=2, + set_sm_margin=False, + method="bulk", + ), + fc1_fprop=run.Config( + RingExchangeOverlapCfg, + aggregate=False, + method="ring_exchange", + num_sm=1, + set_sm_margin=False, + ), + fc2_dgrad=run.Config( + RingExchangeOverlapCfg, + aggregate=False, + method="ring_exchange", + num_sm=1, + set_sm_margin=False, + ), + fc2_fprop=run.Config( + PipelineOverlapCfg, + num_sm=16, + cga_size=2, + num_splits=4, + set_sm_margin=True, + fp8_buf=False, + ), + ) @run.cli.factory @run.autoconvert -def combined_callbacks_lora() -> list[pl.Callback]: - start_step = 5 - end_step = 10 - return [ - timing_callback(), - run.Config( - MegatronCommOverlapCallback, - tp_comm_overlap=False, +def llama3_70b_fp8_tp_overlap_config() -> run.Config[TransformerLayerTPOverlapCfg]: + return run.Config( + TransformerLayerTPOverlapCfg, + qkv_dgrad=run.Config( + BulkOverlapCfg, + cga_size=2, + method="bulk", + num_sm=4, + set_sm_margin=False, ), - run.Config(GarbageCollectionCallback, gc_interval_train=start_step, gc_interval_val=end_step), - ] + qkv_wgrad=run.Config( + BulkOverlapCfg, + cga_size=2, + method="bulk", + num_sm=4, + set_sm_margin=False, + ), + qkv_fprop=run.Config( + RingExchangeOverlapCfg, + aggregate=False, + method="ring_exchange", + num_sm=1, + set_sm_margin=False, + ), + proj_dgrad=run.Config( + RingExchangeOverlapCfg, + aggregate=False, + method="ring_exchange", + num_sm=1, + set_sm_margin=False, + ), + proj_fprop=run.Config( + PipelineOverlapCfg, + num_sm=24, + cga_size=2, + num_splits=4, + set_sm_margin=True, + fp8_buf=True, + method="pipeline", + ), + fc1_dgrad=run.Config( + BulkOverlapCfg, + num_sm=2, + cga_size=2, + set_sm_margin=False, + method="bulk", + ), + fc1_wgrad=run.Config( + BulkOverlapCfg, + num_sm=4, + cga_size=2, + set_sm_margin=False, + method="bulk", + ), + fc1_fprop=run.Config( + RingExchangeOverlapCfg, + aggregate=False, + method="ring_exchange", + num_sm=1, + set_sm_margin=False, + ), + fc2_dgrad=run.Config( + RingExchangeOverlapCfg, + aggregate=False, + method="ring_exchange", + num_sm=1, + set_sm_margin=False, + ), + fc2_fprop=run.Config( + PipelineOverlapCfg, + num_sm=16, + cga_size=2, + num_splits=4, + set_sm_margin=True, + fp8_buf=False, + method="pipeline", + ), + ) -@run.cli.factory -@run.autoconvert -def combined_callbacks_pretrain() -> list[pl.Callback]: - start_step = 5 - end_step = 10 - return [ - timing_callback(), - run.Config(MegatronCommOverlapCallback, overlap_param_gather_with_optimizer_step=False), - run.Config( - NsysCallback, - start_step=start_step, - end_step=end_step, +# LLAMA3 8B Recipe +@run.cli.factory(target=llm.pretrain) +def cloudai_llama3_8b_recipe() -> run.Partial: + recipe = run.Partial( + llm.pretrain, + model=run.Config(LlamaModel, config=Llama3Config8B()), + trainer=run.Config( + nl.Trainer, + devices=8, + num_nodes=1, + accelerator="gpu", + max_steps=10, + limit_test_batches=50, + limit_val_batches=32, + log_every_n_steps=10, + accumulate_grad_batches=1, + use_distributed_sampler=False, + plugins=run.Config( + nl.MegatronMixedPrecision, + precision="bf16-mixed", + params_dtype=torch.bfloat16, + pipeline_dtype=torch.bfloat16, + autocast_enabled=False, + grad_reduce_in_fp32=False, + ), + strategy=run.Config( + nl.MegatronStrategy, + tensor_model_parallel_size=1, + pipeline_model_parallel_size=1, + context_parallel_size=2, + virtual_pipeline_model_parallel_size=None, + sequence_parallel=False, + gradient_as_bucket_view=True, + ddp=run.Config( + DistributedDataParallelConfig, + check_for_nan_in_grad=True, + grad_reduce_in_fp32=True, + overlap_grad_reduce=True, + overlap_param_gather=True, + average_in_collective=True, + ), + ), + num_sanity_val_steps=0, + val_check_interval=1000, + max_epochs=10, + callbacks=[ + run.Config(NsysCallback, start_step=5, end_step=10), + run.Config(TimingCallback), + ], ), - run.Config(GarbageCollectionCallback, gc_interval_train=100, gc_interval_val=100), - ] + data=run.Config( + MockDataModule, + seq_length=8192, + micro_batch_size=1, + global_batch_size=32, + tokenizer=null_tokenizer(vocab_size=128256), + ), + optim=run.Config( + nl.MegatronOptimizerModule, + config=run.Config( + OptimizerConfig, + optimizer="adam", + lr=3e-4, + bf16=True, + fp16=False, + params_dtype=torch.bfloat16, + use_distributed_optimizer=True, + weight_decay=0.1, + adam_beta1=0.9, + adam_beta2=0.95, + adam_eps=1e-05, + clip_grad=1.0, + ), + lr_scheduler=run.Config( + CosineAnnealingScheduler, + warmup_steps=2000, + constant_steps=0, + min_lr=2.9999999999999997e-05, + ), + ), + resume=default_resume(), + log=default_log(), + ) + return recipe -@run.cli.factory(target=SquadDataModule, target_arg="packed_sequence_specs") +# LLAMA3 70B Recipe +@run.cli.factory(target=llm.pretrain) +def cloudai_llama3_70b_recipe() -> run.Partial: + recipe = run.Partial( + llm.pretrain, + model=run.Config(LlamaModel, config=Llama3Config70B()), + data=run.Config( + MockDataModule, + seq_length=8192, + micro_batch_size=1, + global_batch_size=8, + tokenizer=null_tokenizer(vocab_size=128256), + ), + trainer=run.Config( + nl.Trainer, + devices=8, + num_nodes=1, + accelerator="gpu", + max_steps=10, + limit_test_batches=50, + limit_val_batches=32, + log_every_n_steps=10, + plugins=run.Config( + nl.MegatronMixedPrecision, + precision="bf16-mixed", + params_dtype=torch.bfloat16, + pipeline_dtype=torch.bfloat16, + autocast_enabled=False, + grad_reduce_in_fp32=False, + ), + strategy=run.Config( + nl.MegatronStrategy, + tensor_model_parallel_size=4, + pipeline_model_parallel_size=1, + context_parallel_size=1, + virtual_pipeline_model_parallel_size=None, + sequence_parallel=True, + pipeline_dtype=torch.bfloat16, + ddp=run.Config( + DistributedDataParallelConfig, + check_for_nan_in_grad=True, + grad_reduce_in_fp32=True, + overlap_grad_reduce=True, + overlap_param_gather=True, + ), + ), + num_sanity_val_steps=0, + val_check_interval=1000, + max_epochs=10, + callbacks=[ + run.Config( + MegatronCommOverlapCallback, + tp_comm_overlap=True, + tp_comm_overlap_cfg=llama3_70b_bf16_tp_overlap_config(), + overlap_grad_reduce=True, + overlap_param_gather=True, + ), + timing_callback(), + ], + ), + optim=run.Config( + nl.MegatronOptimizerModule, + config=run.Config( + OptimizerConfig, + lr=1e-4, + bf16=True, + params_dtype=torch.bfloat16, + use_distributed_optimizer=True, + weight_decay=0, + ), + ), + resume=run.Config( + nl.AutoResume, + resume_if_exists=True, + resume_ignore_no_checkpoint=True, + resume_past_end=True, + ), + ) + return recipe + + +# LLAMA3 405B Recipe +@run.cli.factory(target=llm.pretrain) @run.autoconvert -def packed_sequence_data_lora() -> run.Config[PackedSequenceSpecs]: - return run.Config(PackedSequenceSpecs, pad_cu_seqlens=False, packed_sequence_size=4096) +def cloudai_llama3_405b_recipe() -> run.Partial: + recipe = run.Partial( + llm.pretrain, + model=run.Config(LlamaModel, config=Llama31Config405B()), + data=run.Config( + MockDataModule, + seq_length=8192, + micro_batch_size=1, + global_batch_size=8, + tokenizer=null_tokenizer(128256), + ), + trainer=run.Config( + nl.Trainer, + devices=8, + num_nodes=1, + accelerator="gpu", + max_steps=10, + limit_test_batches=50, + limit_val_batches=32, + log_every_n_steps=10, + strategy=run.Config( + nl.MegatronStrategy, + tensor_model_parallel_size=8, + pipeline_model_parallel_size=1, + context_parallel_size=2, + virtual_pipeline_model_parallel_size=None, + sequence_parallel=False, + pipeline_dtype=torch.bfloat16, + ddp=run.Config( + DistributedDataParallelConfig, + check_for_nan_in_grad=True, + grad_reduce_in_fp32=True, + overlap_grad_reduce=True, + overlap_param_gather=True, + ), + ), + num_sanity_val_steps=0, + val_check_interval=1000, + max_epochs=10, + ), + optim=run.Config( + nl.MegatronOptimizerModule, + config=run.Config( + OptimizerConfig, + lr=1e-4, + bf16=True, + params_dtype=torch.bfloat16, + use_distributed_optimizer=True, + weight_decay=0, + ), + ), + resume=run.Config( + nl.AutoResume, + resume_if_exists=True, + resume_ignore_no_checkpoint=True, + resume_past_end=True, + ), + ) + return recipe +# NEMOTRON3 8B Recipe @run.cli.factory(target=llm.pretrain) @run.autoconvert -def cloudai_recipe() -> run.Partial: +def cloudai_nemotron3_8b_recipe() -> run.Partial: recipe = run.Partial( llm.pretrain, - model=run.Config(NemotronModel, config=run.Config(Nemotron4Config15B)), + model=run.Config(nemotron3_8b_recipe(performance_mode=True)), data=run.Config( MockDataModule, seq_length=2048, @@ -199,13 +649,194 @@ def cloudai_recipe() -> run.Partial: resume_past_end=True, ), ) + return recipe + + +# NEMOTRON4 15B Recipe +@run.cli.factory(target=llm.pretrain) +def cloudai_nemotron4_15b_recipe() -> run.Partial: + recipe = run.Partial( + llm.pretrain, + model=run.Config(NemotronModel, config=Nemotron4Config15B()), + data=run.Config( + MockDataModule, + seq_length=4096, + micro_batch_size=1, + global_batch_size=8, + tokenizer=null_tokenizer(vocab_size=256000), + ), + trainer=run.Config( + nl.Trainer, + devices=8, + num_nodes=2, + accelerator="gpu", + max_steps=10, + limit_test_batches=50, + limit_val_batches=32, + log_every_n_steps=10, + use_distributed_sampler=False, + val_check_interval=150, + plugins=run.Config( + nl.MegatronMixedPrecision, + autocast_enabled=False, + grad_reduce_in_fp32=False, + params_dtype=torch.bfloat16, + pipeline_dtype=torch.bfloat16, + precision="bf16-mixed", + ), + strategy=run.Config( + nl.MegatronStrategy, + tensor_model_parallel_size=4, + pipeline_model_parallel_size=1, + context_parallel_size=1, + virtual_pipeline_model_parallel_size=None, + sequence_parallel=True, + pipeline_dtype=None, + gradient_as_bucket_view=True, + ckpt_async_save=True, + ckpt_include_optimizer=True, + ckpt_parallel_load=True, + ddp=run.Config( + DistributedDataParallelConfig, + check_for_nan_in_grad=True, + grad_reduce_in_fp32=True, + overlap_grad_reduce=True, + overlap_param_gather=True, + average_in_collective=True, + ), + ), + num_sanity_val_steps=0, + max_epochs=10, + callbacks=[timing_callback()], + ), + optim=run.Config( + nl.MegatronOptimizerModule, + config=run.Config( + OptimizerConfig, + lr=1e-4, + bf16=True, + params_dtype=torch.bfloat16, + use_distributed_optimizer=True, + weight_decay=0, + ), + ), + resume=run.Config( + nl.AutoResume, + resume_if_exists=True, + resume_ignore_no_checkpoint=True, + resume_past_end=True, + ), + ) + return recipe + +# NEMOTRON4 340B Recipe +@run.cli.factory(target=llm.pretrain) +def cloudai_nemotron4_340b_recipe() -> run.Partial: + recipe = run.Partial( + llm.pretrain, + model=run.Config(NemotronModel, config=Nemotron4Config340B()), + data=run.Config( + MockDataModule, + seq_length=4096, + micro_batch_size=1, + global_batch_size=8, + tokenizer=null_tokenizer(vocab_size=128256), + ), + trainer=run.Config( + nl.Trainer, + devices=8, + num_nodes=1, + accelerator="gpu", + max_steps=10, + limit_test_batches=50, + limit_val_batches=32, + log_every_n_steps=10, + use_distributed_sampler=False, + plugins=run.Config( + nl.MegatronMixedPrecision, + autocast_enabled=False, + grad_reduce_in_fp32=False, + params_dtype=torch.bfloat16, + pipeline_dtype=torch.bfloat16, + precision="bf16-mixed", + ), + strategy=run.Config( + nl.MegatronStrategy, + tensor_model_parallel_size=8, + pipeline_model_parallel_size=8, + context_parallel_size=2, + virtual_pipeline_model_parallel_size=12, + sequence_parallel=True, + pipeline_dtype=torch.bfloat16, + gradient_as_bucket_view=True, + ckpt_async_save=True, + ckpt_include_optimizer=True, + ckpt_parallel_load=True, + ddp=run.Config( + DistributedDataParallelConfig, + check_for_nan_in_grad=True, + grad_reduce_in_fp32=True, + overlap_grad_reduce=True, + overlap_param_gather=True, + average_in_collective=True, + ), + ), + num_sanity_val_steps=0, + val_check_interval=500, + max_epochs=10, + callbacks=[ + run.Config( + MegatronCommOverlapCallback, + tp_comm_overlap=True, + overlap_grad_reduce=True, + overlap_param_gather=True, + ), + timing_callback(), + ], + ), + optim=run.Config( + nl.MegatronOptimizerModule, + config=run.Config( + OptimizerConfig, + lr=1e-4, + bf16=True, + params_dtype=torch.bfloat16, + use_distributed_optimizer=True, + weight_decay=0, + ), + ), + resume=run.Config( + nl.AutoResume, + resume_if_exists=True, + resume_ignore_no_checkpoint=True, + resume_past_end=True, + ), + ) return recipe if __name__ == "__main__": mode = os.getenv("CLOUDAI_NEMO_TASK") - print(f"Running in mode {mode}") + + supported_recipes = [ + "cloudai_llama3_8b_recipe", + "cloudai_llama3_70b_recipe", + "cloudai_llama3_405b_recipe", + "cloudai_nemotron3_8b_recipe", + "cloudai_nemotron4_15b_recipe", + "cloudai_nemotron4_340b_recipe", + ] + + recipe_name = os.getenv("CLOUDAI_NEMO_RECIPE") + + if recipe_name not in supported_recipes: + print( + ( + f"Warning: Using Default Recipe '{recipe_name}'. " + "Advanced CLI features that use ForwardRefs are not supported using in Nemo-Run CLI yet." + ) + ) if mode == "pretrain": run.cli.main(fn=llm.pretrain) elif mode == "finetune": diff --git a/src/cloudai/workloads/nemo_run/slurm_command_gen_strategy.py b/src/cloudai/workloads/nemo_run/slurm_command_gen_strategy.py index cd6a05535..57c0d4a0f 100644 --- a/src/cloudai/workloads/nemo_run/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/nemo_run/slurm_command_gen_strategy.py @@ -35,16 +35,29 @@ def _parse_slurm_args( cmd_args: Dict[str, Union[str, List[str]]], tr: TestRun, ) -> Dict[str, Any]: - cloudai_nemo_task = cmd_args.get("task", "") - env_vars["CLOUDAI_NEMO_TASK"] = f"{cloudai_nemo_task}" + tdef: NeMoRunTestDefinition = cast(NeMoRunTestDefinition, tr.test.test_definition) + self._set_additional_env_vars(env_vars, tdef) base_args = super()._parse_slurm_args(job_name_prefix, env_vars, cmd_args, tr) - tdef: NeMoRunTestDefinition = cast(NeMoRunTestDefinition, tr.test.test_definition) base_args.update({"image_path": tdef.docker_image.installed_path}) return base_args + def _set_additional_env_vars(self, env_vars: Dict[str, Union[str, List[str]]], tdef: NeMoRunTestDefinition): + """Set environment variables based on NeMoRunTestDefinition.""" + env_vars["CLOUDAI_NEMO_TASK"] = tdef.cmd_args.task + env_vars["CLOUDAI_NEMO_RECIPE"] = tdef.cmd_args.recipe_name + + pipeline_model_parallel_size = tdef.cmd_args.trainer.strategy.pipeline_model_parallel_size + if isinstance(pipeline_model_parallel_size, list): + pipeline_model_parallel_size = pipeline_model_parallel_size[0] + pipeline_model_parallel_size = int(pipeline_model_parallel_size) + + if pipeline_model_parallel_size > 1: + logging.debug("Setting NCCL_P2P_NET_CHUNKSIZE to 2097152 as pipeline_model_parallel_size is greater than 1") + env_vars["NCCL_P2P_NET_CHUNKSIZE"] = "2097152" + def _run_script(self, tr: TestRun) -> Path: tdef: NeMoRunTestDefinition = cast(NeMoRunTestDefinition, tr.test.test_definition) return tdef.script.installed_path @@ -71,6 +84,25 @@ def append_flattened_dict(self, prefix: str, d: Dict[str, Any], command: List[st else: command.append(f"{key}={value}") + def _validate_recipe_name(self, recipe_name: str) -> str: + """Validate the recipe name against the supported list.""" + supported_recipes = [ + "cloudai_llama3_8b_recipe", + "cloudai_llama3_70b_recipe", + "cloudai_llama3_405b_recipe", + "cloudai_nemotron3_8b_recipe", + "cloudai_nemotron4_15b_recipe", + "cloudai_nemotron4_340b_recipe", + ] + + if recipe_name not in supported_recipes: + logging.warning( + f"Using default {recipe_name} in Nemo2.0. " + "Passing advance CLI options (e.g., factory fuctions) might not be fully supported in Nemo-Run CLI." + ) + + return recipe_name + def generate_test_command( self, env_vars: Dict[str, Union[str, List[str]]], cmd_args: Dict[str, Union[str, List[str]]], tr: TestRun ) -> List[str]: @@ -81,11 +113,13 @@ def generate_test_command( for non_cmd_arg in {"docker_image_url", "num_layers", "task", "recipe_name"}: cmd_args_dict.pop(non_cmd_arg) + recipe_name = self._validate_recipe_name(tdef.cmd_args.recipe_name) + command = [ "python", f"/cloudai_install/{self._run_script(tr).name}", "--factory", - tdef.cmd_args.recipe_name, + recipe_name, "-y", ] diff --git a/tests/ref_data/nemo-run-no-hook.sbatch b/tests/ref_data/nemo-run-no-hook.sbatch index 80c11f36f..7e0ce1515 100644 --- a/tests/ref_data/nemo-run-no-hook.sbatch +++ b/tests/ref_data/nemo-run-no-hook.sbatch @@ -6,6 +6,7 @@ #SBATCH --partition=main export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +export CLOUDAI_NEMO_RECIPE=llama_3b export CLOUDAI_NEMO_TASK=pretrain srun --export=ALL --mpi=pmix --container-image=nvcr.io/nvidia/nemo:24.09 --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__CLOUDAI_DIR__/src/cloudai/workloads/nemo_run:/cloudai_workspace,__OUTPUT_DIR__/install:/cloudai_install --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." diff --git a/tests/ref_data/nemo-run-pre-test.sbatch b/tests/ref_data/nemo-run-pre-test.sbatch index a31d54b3c..500af41e7 100644 --- a/tests/ref_data/nemo-run-pre-test.sbatch +++ b/tests/ref_data/nemo-run-pre-test.sbatch @@ -6,6 +6,7 @@ #SBATCH --partition=main export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +export CLOUDAI_NEMO_RECIPE=llama_3b export CLOUDAI_NEMO_TASK=pretrain srun --export=ALL --mpi=pmix --container-image=nvcr.io/nvidia/nemo:24.09 --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__CLOUDAI_DIR__/src/cloudai/workloads/nemo_run:/cloudai_workspace,__OUTPUT_DIR__/install:/cloudai_install --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." diff --git a/tests/ref_data/nemo-run-vboost.sbatch b/tests/ref_data/nemo-run-vboost.sbatch new file mode 100644 index 000000000..ec4d33ca6 --- /dev/null +++ b/tests/ref_data/nemo-run-vboost.sbatch @@ -0,0 +1,18 @@ +#!/bin/bash +#SBATCH --job-name=__JOB_NAME__ +#SBATCH -N 1 +#SBATCH --output=__OUTPUT_DIR__/output/stdout.txt +#SBATCH --error=__OUTPUT_DIR__/output/stderr.txt +#SBATCH --partition=main + +export SLURM_JOB_MASTER_NODE=$(scontrol show hostname $SLURM_JOB_NODELIST | head -n 1) +export CLOUDAI_NEMO_RECIPE=llama_3b +export CLOUDAI_NEMO_TASK=pretrain +export ENABLE_VBOOST=1 +srun --export=ALL --mpi=pmix --container-image=nvcr.io/nvidia/nemo:24.09 --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__CLOUDAI_DIR__/src/cloudai/workloads/nemo_run:/cloudai_workspace,__OUTPUT_DIR__/install:/cloudai_install --output=__OUTPUT_DIR__/output/vboost.out --error=__OUTPUT_DIR__/output/vboost.err bash -c "sudo nvidia-smi boost-slider --vboost 1" + +srun --export=ALL --mpi=pmix --container-image=nvcr.io/nvidia/nemo:24.09 --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__CLOUDAI_DIR__/src/cloudai/workloads/nemo_run:/cloudai_workspace,__OUTPUT_DIR__/install:/cloudai_install --output=__OUTPUT_DIR__/output/mapping-stdout.txt --error=__OUTPUT_DIR__/output/mapping-stderr.txt bash -c "echo \$(date): \$(hostname):node \${SLURM_NODEID}:rank \${SLURM_PROCID}." + +srun --export=ALL --mpi=pmix --container-image=nvcr.io/nvidia/nemo:24.09 --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__CLOUDAI_DIR__/src/cloudai/workloads/nemo_run:/cloudai_workspace,__OUTPUT_DIR__/install:/cloudai_install --ntasks=1 --ntasks-per-node=1 --output=__OUTPUT_DIR__/output/metadata/node-%N.toml --error=__OUTPUT_DIR__/output/metadata/nodes.err bash /cloudai_install/slurm-metadata.sh + +srun --export=ALL --mpi=pmix --container-image=nvcr.io/nvidia/nemo:24.09 --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__CLOUDAI_DIR__/src/cloudai/workloads/nemo_run:/cloudai_workspace,__OUTPUT_DIR__/install:/cloudai_install python /cloudai_install/cloudai_nemorun.py --factory llama_3b -y trainer.max_steps=100 trainer.val_check_interval=1000 trainer.num_nodes=1 trainer.strategy.tensor_model_parallel_size=1 trainer.strategy.pipeline_model_parallel_size=1 trainer.strategy.context_parallel_size=2 data.micro_batch_size=1 data.global_batch_size=1 \ No newline at end of file diff --git a/tests/slurm_command_gen_strategy/test_nemo_run_slurm_command_gen_strategy.py b/tests/slurm_command_gen_strategy/test_nemo_run_slurm_command_gen_strategy.py index ae16898b9..101e775a2 100644 --- a/tests/slurm_command_gen_strategy/test_nemo_run_slurm_command_gen_strategy.py +++ b/tests/slurm_command_gen_strategy/test_nemo_run_slurm_command_gen_strategy.py @@ -73,6 +73,9 @@ def test_generate_test_command(self, cmd_gen_strategy: NeMoRunSlurmCommandGenStr data=Data(micro_batch_size=1), ) test_run.test.test_definition.cmd_args = cmd_args + + recipe_name = cmd_gen_strategy._validate_recipe_name(cmd_args.recipe_name) + cmd = cmd_gen_strategy.generate_test_command( test_run.test.test_definition.extra_env_vars, test_run.test.test_definition.cmd_args.model_dump(), test_run ) @@ -81,7 +84,7 @@ def test_generate_test_command(self, cmd_gen_strategy: NeMoRunSlurmCommandGenStr "python", f"/cloudai_install/{cmd_gen_strategy._run_script(test_run).name}", "--factory", - cmd_args.recipe_name, + recipe_name, "-y", ] assert ( diff --git a/tests/test_acceptance.py b/tests/test_acceptance.py index 86a184a52..6f4ed0577 100644 --- a/tests/test_acceptance.py +++ b/tests/test_acceptance.py @@ -234,6 +234,7 @@ def build_special_test_run( "nemo-launcher", "nemo-run-pre-test", "nemo-run-no-hook", + "nemo-run-vboost", "slurm_container", "megatron-run", ] @@ -291,13 +292,36 @@ def test_req(request, slurm_system: SlurmSystem, partial_tr: partial[TestRun]) - ), MegatronRunSlurmCommandGenStrategy, ), + "nemo-run": lambda: create_test_run( + partial_tr, + slurm_system, + "nemo-run", + NeMoRunTestDefinition( + name="nemo-run", + description="Test enabling vboost", + test_template_name="nemo-run", + cmd_args=NeMoRunCmdArgs( + docker_image_url="nvcr.io/nvidia/nemo:24.09", + task="pretrain", + recipe_name="llama_3b", + ), + ), + NeMoRunSlurmCommandGenStrategy, + ), } if request.param.startswith(("gpt-", "grok-", "nemo-run-", "nemo-launcher")): - return build_special_test_run(partial_tr, slurm_system, request.param, test_mapping) + tr, sbatch_file, run_script = build_special_test_run(partial_tr, slurm_system, request.param, test_mapping) + + if request.param == "nemo-run-vboost": + tr.test.extra_env_vars["ENABLE_VBOOST"] = "1" + + return tr, sbatch_file, run_script + if request.param in test_mapping: tr = test_mapping[request.param]() return tr, f"{request.param}.sbatch", None + raise ValueError(f"Unknown test: {request.param}")