-
Notifications
You must be signed in to change notification settings - Fork 3.4k
Description
Hi
Based on the tutorials, I've written a LLM pretraining script which works when I run it via local executor on non-slurm as well as single-node slurm machines. I now want to run it on multiple nodes. To do the same I adapted the SLURM Exectuor from the tutorial. Here is my code -
# Adapted from recipes in the NeMo repository
# Open container - docker run --gpus all --ulimit stack=6718846 --net=host --rm -it -v ${PWD}:/workspace -v /scratch/sws0/user/afkhan/Nemo_Work:/Storage -w /workspace nvcr.io/nvidia/nemo:25.02.rc7 bash
# docker run --gpus all --ulimit stack=6718846 --net=host --rm -it nvcr.io/nvidia/nemo:25.02.rc7 bash
# python pretrain_llama32_1b_dclm_v2.py
from typing import Callable, Optional
import time
import argparse
from dotenv import load_dotenv
import os
import wandb
import nemo_run as run
from nemo_run import Config
from megatron.core.distributed import DistributedDataParallelConfig
import torch
from nemo import lightning as nl
from nemo.collections.llm.api import pretrain
from nemo.collections.llm.gpt.model.llama import Llama32Config1B, LlamaModel
from nemo.collections.llm.recipes.optim.adam import distributed_fused_adam_with_cosine_annealing
from nemo.collections.llm.recipes.precision.mixed_precision import bf16_mixed
# from nemo.collections.llm.recipes.callbacks import straggler_det_callback
from nemo.collections import llm
from nemo.collections.common.tokenizers.huggingface.auto_tokenizer import AutoTokenizer
from nemo.lightning.pytorch.callbacks.garbage_collection import GarbageCollectionCallback
from nemo.utils.exp_manager import TimingCallback
from lightning.pytorch.loggers import TensorBoardLogger, WandbLogger
from lightning.pytorch.callbacks import Callback
import lightning.pytorch as pl
from lightning.pytorch.callbacks.callback import Callback
load_dotenv()
wandb.login(key=os.environ.get("WANDB_API_KEY"))
WANDB_PROJECT_NAME = "Nemo_Testing"
# SLURM RELATED
USER = "afkhan"
HOST = "sws-grid-submit.mpi-sws.org"
REMOTE_JOB_DIR = "/Storage/SLURM_Jobs"
ACCOUNT = "a100"
PARTITION = "a100"
CONTAINER_IMAGE = "nvcr.io/nvidia/nemo:25.02.rc7"
CUSTOM_MOUNTS = [
"/NS/llm-pretraining/work/afkhan/USC_Colab/Nemo_Testing:/workspace",
"/scratch/sws0/user/afkhan/Nemo_Work:/Storage"
]
# Logging and Checkpointing Related
LIMIT_VAL_BATCHES = 32
CHECKPOINT_EVERY_N_TRAIN_STEPS = 5
BATCHES_BETWEEN_VAL_CHECK_INTERVAL = 100
LOG_EVERY_N_STEPS = 10
CHECKPOINT_DIR_BASE_PATH = "/Storage/Intermediate_Checkpoints/"
# GPU and Node Related
NUM_GPUS = 2
NUM_NODES = 1
# Data and Batch Related
SEQUENCE_LENGTH = 2048
PER_GPU_BATCH_SIZE = 1
GRADIENT_ACCUMULATION_STEPS = 1
GLOBAL_BATCH_SIZE = PER_GPU_BATCH_SIZE * GRADIENT_ACCUMULATION_STEPS * NUM_GPUS * NUM_NODES
# Data Loader Related
TOKENIZER_PRETRAINED_MODEL_NAME = "allenai/OLMo-1B-0724-hf"
TOKENIZER_VOCAB_FILE = "/workspace/Data/olmo_tokenizer/tokenizer-olmo-1b-0724.json"
DATA_PATH = "/Storage/Data_Artifacts/DCLM_Data/DCLM_Nemo/out_text_document"
INDEX_MAPPING_FILE_PATH = "/Storage/Data_Artifacts/DCLM_Data/Index_Folder/index_mapping_local_shard_0_1_2"
# Model Related
TIE_WEIGHTS = True
PERFORMANCE_MODE = True
# Run Name
NAME = "llama32_1b_dclm" + "-SL-" + str(SEQUENCE_LENGTH) + "-PGBS-" + str(PER_GPU_BATCH_SIZE) + "-GAS-" + str(GRADIENT_ACCUMULATION_STEPS) + "-NGPU-" + str(NUM_GPUS) + "-NNODES-" + str(NUM_NODES)
if TIE_WEIGHTS:
NAME += "-TW"
if PERFORMANCE_MODE:
NAME += "-PERF"
# BATCHES_PARSED_PER_STEP_PER_GPU = PER_GPU_BATCH_SIZE * GRADIENT_ACCUMULATION_STEPS
# BATCHES_BETWEEN_VAL_CHECK_INTERVAL = 500 * BATCHES_PARSED_PER_STEP_PER_GPU
def dclm(
gbs,
mbs,
seq_length,
) -> run.Config[pl.LightningDataModule]:
return run.Config(
llm.PreTrainingDataModule,
# paths=["Data/dclm_local_shard_1_megatron/concatenated.jsonl_text_document"],
paths=[DATA_PATH],
seq_length=seq_length,
global_batch_size=gbs,
micro_batch_size=mbs,
tokenizer=run.Config(
AutoTokenizer,
pretrained_model_name=TOKENIZER_PRETRAINED_MODEL_NAME,
vocab_file=TOKENIZER_VOCAB_FILE,
use_fast=True),
split="99,8,2",
index_mapping_dir=INDEX_MAPPING_FILE_PATH,
# num_workers=DATA_LOADER_WORKERS,
# pin_memory=DATA_LOADER_PIN_MEMORY,
)
@run.cli.factory(name=NAME)
def model() -> run.Config[pl.LightningModule]:
"""
Factory function to create a Llama3.2 1B model configuration.
Returns:
run.Config[pl.LightningModule]: Configuration for the Llama3.2 1B model.
Examples:
CLI usage:
$ nemo llm pretrain model=llama32_1b ...
Python API usage:
>>> model_config = model()
>>> print(model_config)
"""
conf = run.Config(Llama32Config1B)
conf.seq_length = SEQUENCE_LENGTH
conf.share_embeddings_and_output_weights = TIE_WEIGHTS
return run.Config(LlamaModel, config=conf)
def trainer(
tensor_parallelism: int = 1,
pipeline_parallelism: int = 1,
pipeline_parallelism_type: Optional[torch.dtype] = None,
virtual_pipeline_parallelism: Optional[int] = None,
context_parallelism: int = 1,
sequence_parallelism: bool = False,
num_nodes: int = 1,
num_gpus_per_node: int = 8,
max_steps: int = 1168251,
callbacks: Optional[list[run.Config[Callback]]] = None,
) -> run.Config[nl.Trainer]:
"""
Configure the NeMo Lightning Trainer for Llama3.2 1B model.
Args:
tensor_parallelism (int): Degree of tensor model parallelism.
pipeline_parallelism (int): Degree of pipeline model parallelism.
pipeline_parallelism_type (Optional[torch.dtype]): Data type for pipeline parallelism.
virtual_pipeline_parallelism (Optional[int]): Size of virtual pipeline parallelism.
context_parallelism (int): Degree of context parallelism.
sequence_parallelism (bool): Whether to use sequence parallelism.
num_nodes (int): Number of compute nodes to use.
num_gpus_per_node (int): Number of GPUs per node.
max_steps (int): Maximum number of training steps.
callbacks (Optional[list[run.Config[Callback]]]): List of callback configurations.
Returns:
run.Config[nl.Trainer]: Configuration for the NeMo Lightning Trainer.
Examples:
CLI usage:
$ nemo llm pretrain trainer=llama32_1b ...
Python API usage:
>>> trainer_config = trainer(num_nodes=1, num_gpus_per_node=1)
>>> print(trainer_config)
Note:
This configuration uses extensive parallelism to handle the large model size efficiently.
"""
strategy = run.Config(
nl.MegatronStrategy,
tensor_model_parallel_size=tensor_parallelism,
pipeline_model_parallel_size=pipeline_parallelism,
pipeline_dtype=pipeline_parallelism_type,
virtual_pipeline_model_parallel_size=virtual_pipeline_parallelism,
context_parallel_size=context_parallelism,
sequence_parallel=sequence_parallelism,
gradient_as_bucket_view=True,
ckpt_async_save=True,
ckpt_parallel_load=True,
ddp=run.Config(
DistributedDataParallelConfig,
check_for_nan_in_grad=True,
grad_reduce_in_fp32=True,
overlap_grad_reduce=True,
overlap_param_gather=True,
average_in_collective=True,
),
)
trainer = run.Config(
nl.Trainer,
accelerator="gpu",
accumulate_grad_batches=GRADIENT_ACCUMULATION_STEPS,
callbacks=callbacks,
devices=num_gpus_per_node,
# limit_test_batches=50,
limit_val_batches=LIMIT_VAL_BATCHES,
log_every_n_steps=LOG_EVERY_N_STEPS,
max_steps=max_steps,
num_nodes=num_nodes,
plugins=bf16_mixed(),
strategy=strategy,
use_distributed_sampler=False,
val_check_interval=BATCHES_BETWEEN_VAL_CHECK_INTERVAL,
)
return trainer
def tensorboard_logger(name: str, save_dir: str = "tb_logs") -> Config[TensorBoardLogger]:
return Config(TensorBoardLogger, save_dir=save_dir, name=name)
def wandb_logger(project: str, name: str, entity: Optional[str] = None) -> Config[WandbLogger]:
cfg = Config(
WandbLogger,
project=project,
name=name,
config={},
)
if entity:
cfg.entity = entity
return cfg
def default_log(
dir: Optional[str] = None,
name: str = "default",
tensorboard_logger: Optional[Config[TensorBoardLogger]] = None,
wandb_logger: Optional[Config[WandbLogger]] = None,
monitor: str = "val_loss",
save_top_k: int = 1,
save_last: bool = True,
checkpoint_every_n_train_steps: int = 1000,
) -> Config[nl.NeMoLogger]:
ckpt = Config(
nl.ModelCheckpoint,
monitor=monitor,
save_last=save_last,
save_top_k=save_top_k,
every_n_train_steps=checkpoint_every_n_train_steps,
filename="{model_name}--{step}-{consumed_samples}",
)
return Config(
nl.NeMoLogger,
ckpt=ckpt,
name=name,
tensorboard=tensorboard_logger,
wandb=wandb_logger,
log_dir=dir,
)
@run.cli.factory(target=pretrain, name=NAME)
def pretrain_recipe(
dir: Optional[str] = None,
name: str = "default",
num_nodes: int = 1,
num_gpus_per_node: int = 8,
fn: Callable = pretrain,
) -> run.Partial:
"""
Create a pre-training recipe for Llama3.2 1B model.
This function sets up a complete configuration for pre-training, including
model, trainer, data, logging, optimization, and resumption settings.
Args:
dir (Optional[str]): Directory for saving logs and checkpoints.
name (str): Name of the pre-training run.
num_nodes (int): Number of compute nodes to use.
num_gpus_per_node (int): Number of GPUs per node.
fn (Callable): The pre-training function to use.
Returns:
run.Partial: Partial configuration for pre-training.
Examples:
CLI usage:
$ nemo llm pretrain --factory llama32_1b
$ nemo llm pretrain --factory "llama32_1b(num_nodes=1, name='my_1B_pretrain')"
Python API usage:
>>> recipe = pretrain_recipe(name="llama32_1b_pretrain", num_nodes=1)
>>> print(recipe)
Note:
This recipe is optimized for the large 8B model and requires significant computational resources.
"""
recipe = run.Partial(
fn,
model=model(),
trainer=trainer(
num_nodes=num_nodes,
num_gpus_per_node=num_gpus_per_node,
callbacks=[
run.Config(TimingCallback, log_tokens_per_sec=True),
# run.Config(straggler_det_callback, stop_if_detected_straggler=False),
],
),
log=default_log(dir=dir,
name=name,
tensorboard_logger=tensorboard_logger(name=name),
wandb_logger=wandb_logger(project=WANDB_PROJECT_NAME, name=name),
monitor="global_step",
save_top_k=-1, # Save all checkpoints
save_last=True,
checkpoint_every_n_train_steps=CHECKPOINT_EVERY_N_TRAIN_STEPS,
),
optim=distributed_fused_adam_with_cosine_annealing(max_lr=3e-4),
resume = run.Config(nl.AutoResume, resume_if_exists=True, resume_ignore_no_checkpoint=True),
)
if PERFORMANCE_MODE:
recipe = pretrain_performance_optimizations(recipe)
return recipe
def pretrain_performance_optimizations(recipe: run.Partial) -> run.Partial:
"""
Create a performance-optimized pre-training recipe for Llama3 1B model. (Copied from Llama3 8B)
This method enables performance optimizations that may not be suitable for all use cases.
It builds upon the standard pre-training recipe and adds additional performance enhancements.
Args:
recipe (run.Partial): Base pre-train recipe to which performance optimizations will be added
Returns:
run.Partial: Partial configuration for performance-optimized pre-training.
Note:
Use this method with caution and only when you need maximum performance.
It may not be suitable for all hardware configurations or use cases.
"""
if not recipe.trainer.callbacks:
recipe.trainer.callbacks = []
garbage_collection_callback = run.Config(
GarbageCollectionCallback,
gc_interval_train=100,
gc_interval_val=100,
)
recipe.trainer.callbacks.extend(
[
garbage_collection_callback,
]
)
return recipe
# Executor for running pretraining
def local_executor_torchrun(nodes: int = 1, ntasks_per_node: int = 2) -> run.LocalExecutor:
executor = run.LocalExecutor(
# nodes=nodes,
ntasks_per_node=ntasks_per_node,
launcher="torchrun"
)
return executor
def slurm_executor(
user: str,
host: str,
remote_job_dir: str,
account: str,
partition: str,
nodes: int,
devices: int,
time: str = "01:00:00",
custom_mounts: Optional[list[str]] = None,
custom_env_vars: Optional[dict[str, str]] = None,
container_image: str = "nvcr.io/nvidia/nemo:dev",
retries: int = 0,
) -> run.SlurmExecutor:
if not (user and host and remote_job_dir and account and partition and nodes and devices):
raise RuntimeError(
"Please set user, host, remote_job_dir, account, partition, nodes and devices args for using this function."
)
mounts = []
# Custom mounts are defined here.
if custom_mounts:
mounts.extend(custom_mounts)
# Env vars for jobs are configured here
env_vars = {
"TRANSFORMERS_OFFLINE": "1",
"TORCH_NCCL_AVOID_RECORD_STREAMS": "1",
"NCCL_NVLS_ENABLE": "0",
"NVTE_DP_AMAX_REDUCE_INTERVAL": "0",
"NVTE_ASYNC_AMAX_REDUCTION": "1",
"NVTE_FUSED_ATTN": "0",
}
if custom_env_vars:
env_vars |= custom_env_vars
# This defines the slurm executor.
# We connect to the executor via the tunnel defined by user, host and remote_job_dir.
executor = run.SlurmExecutor(
account=account,
partition=partition,
tunnel=run.SSHTunnel(
user=user,
host=host,
job_dir=remote_job_dir, # This is where the results of the run will be stored by default.
# identity="/path/to/identity/file" OPTIONAL: Provide path to the private key that can be used to establish the SSH connection without entering your password.
),
nodes=nodes,
ntasks_per_node=devices,
gpus_per_node=devices,
mem="0",
exclusive=True,
gres="gpu:8",
packager=run.Packager(),
)
executor.container_image = container_image
executor.container_mounts = mounts
executor.env_vars = env_vars
executor.retries = retries
executor.time = time
return executor
# This condition is necessary for the script to be compatible with Python's multiprocessing module.
if __name__ == "__main__":
curr_date_time = time.strftime("%Y-%m-%d-%H-%M-%S")
print(NAME + "-" + curr_date_time)
# DIR = "Checkpoints/llama32_1b_dclm-SL-2048-PGBS-16-GAS-4-NGPU-8-NNODES-1-TW-DATA-DW8-PM-True-PERF-2025-02-18-19-47-54"
# NAME = "llama32_1b_dclm-SL-2048-PGBS-16-GAS-4-NGPU-8-NNODES-1-TW-DATA-DW8-PM-True-PERF"
# curr_date_time = "Resume-Testing-Run"
CHECKPOINT_DIR = CHECKPOINT_DIR_BASE_PATH + NAME + "-" + curr_date_time
recipe = pretrain_recipe(name=NAME, num_nodes=NUM_NODES, num_gpus_per_node=NUM_GPUS, dir=CHECKPOINT_DIR)
recipe.data = dclm(gbs=GLOBAL_BATCH_SIZE, mbs=PER_GPU_BATCH_SIZE, seq_length=SEQUENCE_LENGTH)
# executor = local_executor_torchrun(nodes=NUM_NODES, ntasks_per_node=NUM_GPUS)
# run.run(recipe, executor=executor)
executor = slurm_executor(
user=USER, # TODO: Set the username you want to use
host=HOST, # TODO: Set the host of your cluster
remote_job_dir=REMOTE_JOB_DIR, # TODO: Set the directory on the cluster where you want to save results
account=ACCOUNT, # TODO: Set the account for your cluster
partition=PARTITION, # TODO: Set the partition for your cluster
container_image=CONTAINER_IMAGE, # TODO: Set the container image you want to use for your job
custom_mounts=CUSTOM_MOUNTS, # TODO: Set any custom mounts
# custom_env_vars={}, # TODO: Set any custom env vars
nodes=recipe.trainer.num_nodes,
devices=recipe.trainer.devices,
)
run.run(recipe, executor=executor, detach=True)
Now when I run this I run it inside a docker container as otherwise I get import issues. So I first run this command - docker run --gpus all --ulimit stack=6718846 --net=host --rm -it -v ${PWD}:/workspace -v /scratch/sws0/user/afkhan/Nemo_Work:/Storage -w /workspace nvcr.io/nvidia/nemo:25.02.rc7 bash
Now when I run the above file using python file_name.py, I am prompted for the ssh password. Once I provide that I get the following error -
Traceback (most recent call last):
File "/workspace/pretrain_llama32_1b_dclm_v2.py", line 474, in <module>
run.run(recipe, executor=executor, detach=True)
File "/opt/NeMo-Run/src/nemo_run/run/api.py", line 85, in run
exp.run(detach=detach)
File "/opt/NeMo-Run/src/nemo_run/run/experiment.py", line 690, in run
rsync(tunnel.session, self._exp_dir, os.path.dirname(tunnel.job_dir))
File "/opt/NeMo-Run/src/nemo_run/core/tunnel/rsync.py", line 90, in rsync
c.run(f"mkdir -p {target}", hide=hide_output)
File "/usr/local/lib/python3.12/dist-packages/decorator.py", line 232, in fun
return caller(func, *(extras + args), **kw)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/fabric/connection.py", line 23, in opens
return method(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/fabric/connection.py", line 763, in run
return self._run(self._remote_runner(), command, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/invoke/context.py", line 113, in _run
return runner.run(command, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/fabric/runners.py", line 83, in run
return super().run(command, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/invoke/runners.py", line 395, in run
return self._run_body(command, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/invoke/runners.py", line 451, in _run_body
return self.make_promise() if self._asynchronous else self._finish()
^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/dist-packages/invoke/runners.py", line 518, in _finish
raise UnexpectedExit(result)
invoke.exceptions.UnexpectedExit: Encountered a bad command exit code!
Command: 'mkdir -p /Storage/SLURM_Jobs/nemo.collections.llm.api.pretrain'
Exit code: 1
Stdout:
Stderr:
mkdir: cannot create directory ‘/Storage’: Permission denied
I am not sure what am I doing wrong here? I need to mount to run it inside the slurm session right? Also what is the right method to use slurm executor