diff --git a/.gitmodules b/.gitmodules index 796f7b17c3..81d066b8b0 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,7 +1,7 @@ [submodule "3rdparty/Megatron-LM"] path = 3rdparty/Megatron-LM-workspace/Megatron-LM - url = https://github.com/terrykong/Megatron-LM.git - branch = yuya/nemo-rl-use-dev + url = https://github.com/yaoyu-33/Megatron-LM.git + branch = main shallow = true [submodule "3rdparty/Megatron-Bridge"] path = 3rdparty/Megatron-Bridge-workspace/Megatron-Bridge diff --git a/3rdparty/Megatron-Bridge-workspace/Megatron-Bridge b/3rdparty/Megatron-Bridge-workspace/Megatron-Bridge index 1e9a459b43..15398e08fc 160000 --- a/3rdparty/Megatron-Bridge-workspace/Megatron-Bridge +++ b/3rdparty/Megatron-Bridge-workspace/Megatron-Bridge @@ -1 +1 @@ -Subproject commit 1e9a459b43aa1f62ca1356e554d2b0196ebdd546 +Subproject commit 15398e08fc86be3de084c7382116527246ab1852 diff --git a/3rdparty/Megatron-Bridge-workspace/setup.py b/3rdparty/Megatron-Bridge-workspace/setup.py index 9aec2e6481..a0beea9449 100644 --- a/3rdparty/Megatron-Bridge-workspace/setup.py +++ b/3rdparty/Megatron-Bridge-workspace/setup.py @@ -26,8 +26,9 @@ bridge_package_name = "megatron.bridge" CACHED_DEPENDENCIES = [ - "transformers>=4.57.1", + "transformers<5.0.0", "datasets", + "accelerate", "omegaconf>=2.3.0", "tensorboard>=2.19.0", "typing-extensions", @@ -40,7 +41,7 @@ "hydra-core>1.3,<=1.3.2", "megatron-core[dev,mlm]>=0.15.0a0,<0.17.0", "qwen-vl-utils", - "transformer-engine[pytorch]>=2.9.0a0,<2.10.0", + "transformer-engine[pytorch]>=2.10.0a0,<2.12.0", "mamba-ssm", "nvidia-resiliency-ext", "causal-conv1d", diff --git a/3rdparty/Megatron-LM-workspace/Megatron-LM b/3rdparty/Megatron-LM-workspace/Megatron-LM index b73ae5cdab..193463c4f8 160000 --- a/3rdparty/Megatron-LM-workspace/Megatron-LM +++ b/3rdparty/Megatron-LM-workspace/Megatron-LM @@ -1 +1 @@ -Subproject commit b73ae5cdab9d409fcface2b2f3c375710abe6911 +Subproject commit 193463c4f8414e6906a40dd527a450bca50706b1 diff --git a/3rdparty/Megatron-LM-workspace/setup.py b/3rdparty/Megatron-LM-workspace/setup.py index 0a088b393e..fb0a7cf92e 100644 --- a/3rdparty/Megatron-LM-workspace/setup.py +++ b/3rdparty/Megatron-LM-workspace/setup.py @@ -44,30 +44,31 @@ CACHED_DEPENDENCIES = [ # Default dependencies from pyproject.toml "torch", - "numpy<2.0.0", + "numpy", "packaging>=24.2", # Dev dependencies from pyproject.toml - "nvidia-modelopt[torch]>=0.33.0a0,<0.34.0; sys_platform != 'darwin'", - "transformer-engine[pytorch]>=2.9.0a0,<2.10.0", - "nvidia-resiliency-ext>=0.4.0a0,<0.5.0", + "nvidia-modelopt[torch]; sys_platform != 'darwin'", + "transformer-engine[pytorch,core_cu13]>=2.9.0a0,<2.12.0", + "nvidia-resiliency-ext", "tqdm", "einops~=0.8", "tensorstore~=0.1,!=0.1.46,!=0.1.72", "nvtx~=0.2", "multi-storage-client~=0.27", "opentelemetry-api~=1.33.1", - "setuptools<80.0.0", "mamba-ssm~=2.2", "causal-conv1d~=1.5", + "flash-linear-attention~=0.3.2", "nv-grouped-gemm~=1.1", "megatron-energon[av_decode]~=6.0", - "av<16.0.0", - "flashinfer-python", + "av", + "flashinfer-python~=0.5.0", "wget", "onnxscript", - "flash-linear-attention~=0.3.2", # VCS dependency - must match pyproject.toml [tool.uv.sources] "emerging_optimizers @ git+https://github.com/NVIDIA-NeMo/Emerging-Optimizers.git@v0.1.0", + "datasets", + "fastapi~=0.50", ] diff --git a/examples/configs/grpo_math_1B_megatron.yaml b/examples/configs/grpo_math_1B_megatron.yaml index a9368481ae..b240c6519c 100644 --- a/examples/configs/grpo_math_1B_megatron.yaml +++ b/examples/configs/grpo_math_1B_megatron.yaml @@ -148,11 +148,10 @@ policy: top_k: null mcore_generation_config: buffer_size_gb: 20 # Total GPU memory (in GB) allocated for KV cache buffers - buffer_guaranteed_fraction: 0.1 # Fraction of buffer reserved for guaranteed active requests num_cuda_graphs: 16 # Number of CUDA graphs to pre-compile for different batch sizes block_size_tokens: 256 # Size of each KV cache block in tokens (affects memory granularity) use_cuda_graphs_for_non_decode_steps: true # Enable CUDA graphs for prefill/context processing - enable_chunked_prefill: true # Split long prefills into chunks for better memory management + enable_chunked_prefill: false # Split long prefills into chunks for better memory management unified_memory_level: 0 # Unified memory usage level (0=disabled, higher values enable more aggressive paging) max_tokens: 16384 # Maximum number of tokens to use in a single step. Analogous to vllm's max_num_batched_tokens diff --git a/nemo_rl/models/megatron/config.py b/nemo_rl/models/megatron/config.py index 7e249affcd..5838e9d430 100644 --- a/nemo_rl/models/megatron/config.py +++ b/nemo_rl/models/megatron/config.py @@ -25,8 +25,6 @@ class MegatronGenerationConfig(TypedDict): # Total GPU memory (in GB) allocated for KV cache buffers buffer_size_gb: int - # Fraction of buffer reserved for guaranteed active requests - buffer_guaranteed_fraction: float # Number of CUDA graphs to pre-compile for different batch sizes num_cuda_graphs: int # Size of each KV cache block in tokens (affects memory granularity) diff --git a/nemo_rl/models/megatron/setup.py b/nemo_rl/models/megatron/setup.py index 24bfdb0605..e9fc2da9e1 100644 --- a/nemo_rl/models/megatron/setup.py +++ b/nemo_rl/models/megatron/setup.py @@ -51,6 +51,7 @@ from megatron.bridge.utils.instantiate_utils import InstantiationMode from megatron.bridge.utils.vocab_utils import calculate_padded_vocab_size from megatron.core import parallel_state +from megatron.core.process_groups_config import ProcessGroupCollection from megatron.core.transformer import MegatronModule from megatron.core.transformer.module import Float16Module from megatron.core.transformer.transformer_config import TransformerConfig @@ -663,12 +664,15 @@ def setup_model_and_optimizer( checkpointing_context = init_checkpointing_context(megatron_cfg.checkpoint) # Tokenizer + if megatron_cfg.tokenizer.hf_tokenizer_kwargs is None: + megatron_cfg.tokenizer.hf_tokenizer_kwargs = {} + megatron_cfg.tokenizer.hf_tokenizer_kwargs["trust_remote_code"] = True + megatron_cfg.tokenizer.hf_tokenizer_kwargs["use_fast"] = True build_tokenizer( megatron_cfg.tokenizer, make_vocab_size_divisible_by=megatron_cfg.model.make_vocab_size_divisible_by // megatron_cfg.model.tensor_model_parallel_size, tensor_model_parallel_size=megatron_cfg.model.tensor_model_parallel_size, - trust_remote_code=True, ) assert megatron_cfg.model.vocab_size, "vocab size must be specified in model config" @@ -731,6 +735,8 @@ def composed_peft_hook(model: list[MegatronModule]) -> list[MegatronModule]: pre_wrap_hook.extend([composed_peft_hook]) # Model, optimizer, and learning rate. + pg_collection = ProcessGroupCollection.use_mpu_process_groups() + setattr(megatron_cfg.model, "_pg_collection", pg_collection) model = get_model( megatron_cfg.model, megatron_cfg.ddp, @@ -739,6 +745,7 @@ def composed_peft_hook(model: list[MegatronModule]) -> list[MegatronModule]: data_parallel_random_init=megatron_cfg.rng.data_parallel_random_init, pre_wrap_hook=pre_wrap_hook, mixed_precision_wrapper=mixed_precision_wrapper, + pg_collection=pg_collection, ) if load_optimizer: optimizer, scheduler = setup_optimizer( @@ -872,6 +879,7 @@ def setup_reference_model_state( overlap_param_gather_with_optimizer_step=megatron_cfg.optimizer.overlap_param_gather_with_optimizer_step, pre_wrap_hook=megatron_cfg.rng.data_parallel_random_init, mixed_precision_wrapper=ref_mixed_precision_wrapper, + pg_collection=ProcessGroupCollection.use_mpu_process_groups(), ) print("Loading the Reference Model") @@ -925,11 +933,16 @@ def finalize_megatron_setup( megatron_cfg.ddp, optimizer, align_grad_reduce=megatron_cfg.dist.align_grad_reduce, + pg_collection=ProcessGroupCollection.use_mpu_process_groups(), ) tokenizer_config = TokenizerConfig( tokenizer_type="HuggingFaceTokenizer", tokenizer_model=hf_model_name, + hf_tokenizer_kwargs={ + "trust_remote_code": True, + "use_fast": True, + }, ) megatron_tokenizer = build_tokenizer( @@ -937,7 +950,6 @@ def finalize_megatron_setup( make_vocab_size_divisible_by=megatron_cfg.model.make_vocab_size_divisible_by // config["megatron_cfg"]["tensor_model_parallel_size"], tensor_model_parallel_size=config["megatron_cfg"]["tensor_model_parallel_size"], - trust_remote_code=True, ) dp_size = worker_sharding_annotations.get_axis_size("data_parallel") diff --git a/nemo_rl/models/policy/workers/megatron_policy_worker.py b/nemo_rl/models/policy/workers/megatron_policy_worker.py index 798c4ea00a..e1fcc27e65 100644 --- a/nemo_rl/models/policy/workers/megatron_policy_worker.py +++ b/nemo_rl/models/policy/workers/megatron_policy_worker.py @@ -27,6 +27,7 @@ maybe_finalize_async_save, save_checkpoint, ) +from megatron.bridge.training.utils.pg_utils import get_pg_collection from megatron.bridge.training.utils.train_utils import ( logical_and_across_model_parallel_group, reduce_max_stat_across_model_parallel_group, @@ -415,18 +416,20 @@ def train( else: update_successful, grad_norm, num_zeros_in_grad = (True, 0.0, 0.0) + pg_collection = get_pg_collection(self.model) + # when freezing sub-models we may have a mixture of successful and unsucessful ranks, # so we must gather across mp ranks update_successful = logical_and_across_model_parallel_group( - update_successful + update_successful, mp_group=pg_collection.mp ) # grad_norm and num_zeros_in_grad will be None on ranks without trainable params, # so we must gather across mp ranks grad_norm: float = reduce_max_stat_across_model_parallel_group( - grad_norm + grad_norm, mp_group=pg_collection.mp ) num_zeros_in_grad: float = reduce_max_stat_across_model_parallel_group( - num_zeros_in_grad + num_zeros_in_grad, mp_group=pg_collection.mp ) if update_successful: @@ -1036,9 +1039,6 @@ def generate( ] enable_chunked_prefill = mcore_generation_config["enable_chunked_prefill"] unified_memory_level = mcore_generation_config["unified_memory_level"] - buffer_guaranteed_fraction = mcore_generation_config[ - "buffer_guaranteed_fraction" - ] max_tokens = mcore_generation_config["max_tokens"] model_config = self.model.config @@ -1050,7 +1050,6 @@ def generate( kv_channels=model_config.kv_channels, num_attention_heads=model_config.num_query_groups, max_sequence_length=self.cfg["generation"]["max_new_tokens"], - buffer_guaranteed_fraction=buffer_guaranteed_fraction, buffer_size_gb=buffer_size_gb, materialize_only_last_token_logits=False, num_cuda_graphs=num_cuda_graphs, @@ -1061,7 +1060,7 @@ def generate( use_cuda_graphs_for_non_decode_steps=use_cuda_graphs_for_non_decode_steps, use_flashinfer_fused_rope=False, unified_memory_level=unified_memory_level, - max_tokens_override=max_tokens, + max_tokens=max_tokens, ) inference_wrapped_model = GPTInferenceWrapper( self.model, inference_wrapper_config, dynamic_context @@ -1134,23 +1133,27 @@ def generate( result = [] while dynamic_engine.has_unfinished_requests(): - result_step = dynamic_engine.step_modern(verbose=False) - finished_requests = result_step.get("finished_requests", []) - for finished_request in finished_requests: - result.append(finished_request) + result_step = dynamic_engine.step_modern() + result.extend(result_step["finished_request_records"]) # Sort results by request_id to maintain original batch order result.sort(key=lambda x: x.request_id) out = { - "tokens": [x.prompt_tokens.tolist() + x.generated_tokens for x in result], - "logprobs": [x.prompt_log_probs + x.generated_log_probs for x in result], + "tokens": [ + x.requests[0].prompt_tokens.tolist() + x.requests[0].generated_tokens + for x in result + ], + "logprobs": [ + x.requests[0].prompt_log_probs + x.requests[0].generated_log_probs + for x in result + ], } input_lengths = data["input_lengths"] # pad the out "tokens" and "logprobs" and make them into tensors from lists batch_size = data["input_ids"].size(0) - max_gen_seq_len = max([len(x.generated_tokens) for x in result]) + max_gen_seq_len = max([len(x.requests[0].generated_tokens) for x in result]) padded_input_length = input_ids.size(1) max_seq_len = padded_input_length + max_gen_seq_len diff --git a/tests/unit/models/megatron/test_megatron_setup.py b/tests/unit/models/megatron/test_megatron_setup.py index 61c4bc7a75..16d77389a6 100644 --- a/tests/unit/models/megatron/test_megatron_setup.py +++ b/tests/unit/models/megatron/test_megatron_setup.py @@ -920,6 +920,7 @@ def test_reinitialize_parallel_state_after_import( class TestSetupModelAndOptimizer: """Tests for setup_model_and_optimizer function.""" + @patch("nemo_rl.models.megatron.setup.ProcessGroupCollection") @patch("nemo_rl.models.megatron.setup.GlobalState") @patch("nemo_rl.models.megatron.setup.initialize_megatron") @patch("nemo_rl.models.megatron.setup.set_jit_fusion_options") @@ -946,6 +947,7 @@ def test_setup_with_param_sync_and_frozen_moe_router( mock_set_jit, mock_init_megatron, mock_global_state, + mock_pg_collection, ): """Test setup_model_and_optimizer with MoE router freezing.""" from nemo_rl.models.megatron.setup import setup_model_and_optimizer @@ -1006,6 +1008,7 @@ def test_setup_with_param_sync_and_frozen_moe_router( class TestSetupReferenceModelState: """Tests for setup_reference_model_state function.""" + @patch("nemo_rl.models.megatron.setup.ProcessGroupCollection") @patch("nemo_rl.models.megatron.setup.init_checkpointing_context") @patch("nemo_rl.models.megatron.setup.GlobalState") @patch("nemo_rl.models.megatron.setup.get_model") @@ -1019,6 +1022,7 @@ def test_setup_reference_model( mock_get_model, mock_global_state, mock_init_ckpt_context, + mock_pg_collection, capsys, ): """Test setup_reference_model_state when checkpoint exists.""" @@ -1075,6 +1079,7 @@ def test_setup_reference_model( class TestFinalizeMegatronSetup: """Tests for finalize_megatron_setup function.""" + @patch("nemo_rl.models.megatron.setup.ProcessGroupCollection") @patch("nemo_rl.models.megatron.setup._update_model_config_funcs") @patch("nemo_rl.models.megatron.setup.build_tokenizer") @patch("nemo_rl.models.megatron.setup.AutoBridge") @@ -1083,6 +1088,7 @@ def test_basic_finalize_setup( mock_auto_bridge, mock_build_tokenizer, mock_update_model_config, + mock_pg_collection, ): """Test basic finalize_megatron_setup.""" from nemo_rl.models.megatron.setup import finalize_megatron_setup diff --git a/tests/unit/models/policy/test_megatron_worker.py b/tests/unit/models/policy/test_megatron_worker.py index 426c64a0d1..7d329ab411 100644 --- a/tests/unit/models/policy/test_megatron_worker.py +++ b/tests/unit/models/policy/test_megatron_worker.py @@ -87,14 +87,13 @@ def create_megatron_test_config( "stop_token_ids": None, "stop_strings": None, "mcore_generation_config": { - "buffer_size_gb": 20, - "buffer_guaranteed_fraction": 0.1, + "buffer_size_gb": 2, "num_cuda_graphs": 16, - "block_size_tokens": 256, + "block_size_tokens": 1024, "use_cuda_graphs_for_non_decode_steps": True, "enable_chunked_prefill": True, "unified_memory_level": 0, - "max_tokens": 16384, + "max_tokens": 65536, }, "colocated": { "enabled": True, diff --git a/uv.lock b/uv.lock index 6a745339ed..587f849732 100644 --- a/uv.lock +++ b/uv.lock @@ -3589,6 +3589,7 @@ wheels = [ name = "megatron-bridge" source = { editable = "3rdparty/Megatron-Bridge-workspace" } dependencies = [ + { name = "accelerate" }, { name = "causal-conv1d" }, { name = "datasets" }, { name = "flash-linear-attention" }, @@ -3614,6 +3615,7 @@ dependencies = [ [package.metadata] requires-dist = [ + { name = "accelerate" }, { name = "causal-conv1d", git = "https://github.com/Dao-AILab/causal-conv1d?rev=67e0a9dfe1518fc0036444e9ab5fe06ab78299e0" }, { name = "datasets" }, { name = "flash-linear-attention" }, @@ -3631,8 +3633,8 @@ requires-dist = [ { name = "tensorboard", specifier = ">=2.19.0" }, { name = "timm" }, { name = "tqdm", specifier = ">=4.67.1" }, - { name = "transformer-engine", extras = ["pytorch"], specifier = ">=2.9.0a0,<2.10.0" }, - { name = "transformers", specifier = ">=4.57.1" }, + { name = "transformer-engine", extras = ["pytorch"], specifier = ">=2.10.0a0,<2.12.0" }, + { name = "transformers", specifier = "<5.0.0" }, { name = "typing-extensions" }, { name = "wandb", specifier = ">=0.19.10" }, ] @@ -3643,8 +3645,10 @@ source = { editable = "3rdparty/Megatron-LM-workspace" } dependencies = [ { name = "av" }, { name = "causal-conv1d" }, + { name = "datasets" }, { name = "einops" }, { name = "emerging-optimizers" }, + { name = "fastapi" }, { name = "flash-linear-attention" }, { name = "flashinfer-python", version = "0.5.2", source = { registry = "https://pypi.org/simple" }, marker = "extra == 'extra-7-nemo-rl-vllm' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang')" }, { name = "flashinfer-python", version = "0.5.3", source = { registry = "https://pypi.org/simple" }, marker = "extra == 'extra-7-nemo-rl-sglang' or extra != 'extra-7-nemo-rl-vllm'" }, @@ -3659,7 +3663,6 @@ dependencies = [ { name = "onnxscript" }, { name = "opentelemetry-api" }, { name = "packaging" }, - { name = "setuptools" }, { name = "tensorstore", version = "0.1.74", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.13' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-sglang' and extra == 'extra-7-nemo-rl-vllm')" }, { name = "tensorstore", version = "0.1.76", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.13' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-sglang' and extra == 'extra-7-nemo-rl-vllm')" }, { name = "torch", version = "2.9.0", source = { registry = "https://pypi.org/simple" }, marker = "sys_platform == 'darwin' or (extra == 'extra-7-nemo-rl-automodel' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-fsdp' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-mcore' and extra == 'extra-7-nemo-rl-sglang') or (extra == 'extra-7-nemo-rl-sglang' and extra == 'extra-7-nemo-rl-vllm')" }, @@ -3671,29 +3674,30 @@ dependencies = [ [package.metadata] requires-dist = [ - { name = "av", specifier = "<16.0.0" }, + { name = "av" }, { name = "causal-conv1d", git = "https://github.com/Dao-AILab/causal-conv1d?rev=67e0a9dfe1518fc0036444e9ab5fe06ab78299e0" }, + { name = "datasets" }, { name = "einops", specifier = "~=0.8" }, { name = "emerging-optimizers", git = "https://github.com/NVIDIA-NeMo/Emerging-Optimizers.git?rev=v0.1.0" }, + { name = "fastapi", specifier = "~=0.50" }, { name = "flash-linear-attention", specifier = "~=0.3.2" }, - { name = "flashinfer-python" }, + { name = "flashinfer-python", specifier = "~=0.5.0" }, { name = "mamba-ssm", git = "https://github.com/state-spaces/mamba.git?rev=d68d16ed7d5d5164eb5a57c0285f3b7eb8394ec1" }, { name = "megatron-energon", extras = ["av-decode"], specifier = "~=6.0" }, { name = "multi-storage-client", specifier = "~=0.27" }, - { name = "numpy", specifier = "<2.0.0" }, + { name = "numpy" }, { name = "nv-grouped-gemm", git = "https://github.com/fanshiqing/grouped_gemm?tag=v1.1.4.post7" }, - { name = "nvidia-modelopt", extras = ["torch"], marker = "sys_platform != 'darwin'", specifier = ">=0.33.0a0,<0.34.0" }, - { name = "nvidia-resiliency-ext", specifier = ">=0.4.0a0,<0.5.0" }, + { name = "nvidia-modelopt", extras = ["torch"], marker = "sys_platform != 'darwin'" }, + { name = "nvidia-resiliency-ext" }, { name = "nvtx", specifier = "~=0.2" }, { name = "onnxscript" }, { name = "opentelemetry-api", specifier = "~=1.33.1" }, { name = "packaging", specifier = ">=24.2" }, - { name = "setuptools", specifier = "<80.0.0" }, { name = "tensorstore", specifier = "~=0.1,!=0.1.46,!=0.1.72" }, { name = "torch", marker = "sys_platform != 'darwin'", index = "https://download.pytorch.org/whl/cu129" }, { name = "torch", marker = "sys_platform == 'darwin'", index = "https://pypi.org/simple" }, { name = "tqdm" }, - { name = "transformer-engine", extras = ["pytorch"], specifier = ">=2.9.0a0,<2.10.0" }, + { name = "transformer-engine", extras = ["core-cu13", "pytorch"], specifier = ">=2.9.0a0,<2.12.0" }, { name = "wget" }, ]