Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[submodule "3rdparty/Megatron-LM"]
path = 3rdparty/Megatron-LM-workspace/Megatron-LM
url = https://github.com/terrykong/Megatron-LM.git
branch = yuya/nemo-rl-use-dev
url = https://github.com/yaoyu-33/Megatron-LM.git
branch = main
shallow = true
[submodule "3rdparty/Megatron-Bridge"]
path = 3rdparty/Megatron-Bridge-workspace/Megatron-Bridge
Expand Down
2 changes: 1 addition & 1 deletion 3rdparty/Megatron-Bridge-workspace/Megatron-Bridge
Submodule Megatron-Bridge updated 384 files
5 changes: 3 additions & 2 deletions 3rdparty/Megatron-Bridge-workspace/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@
bridge_package_name = "megatron.bridge"

CACHED_DEPENDENCIES = [
"transformers>=4.57.1",
"transformers<5.0.0",
"datasets",
"accelerate",
"omegaconf>=2.3.0",
"tensorboard>=2.19.0",
"typing-extensions",
Expand All @@ -40,7 +41,7 @@
"hydra-core>1.3,<=1.3.2",
"megatron-core[dev,mlm]>=0.15.0a0,<0.17.0",
"qwen-vl-utils",
"transformer-engine[pytorch]>=2.9.0a0,<2.10.0",
"transformer-engine[pytorch]>=2.10.0a0,<2.12.0",
"mamba-ssm",
"nvidia-resiliency-ext",
"causal-conv1d",
Expand Down
2 changes: 1 addition & 1 deletion 3rdparty/Megatron-LM-workspace/Megatron-LM
Submodule Megatron-LM updated 966 files
17 changes: 9 additions & 8 deletions 3rdparty/Megatron-LM-workspace/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,30 +44,31 @@
CACHED_DEPENDENCIES = [
# Default dependencies from pyproject.toml
"torch",
"numpy<2.0.0",
"numpy",
"packaging>=24.2",
# Dev dependencies from pyproject.toml
"nvidia-modelopt[torch]>=0.33.0a0,<0.34.0; sys_platform != 'darwin'",
"transformer-engine[pytorch]>=2.9.0a0,<2.10.0",
"nvidia-resiliency-ext>=0.4.0a0,<0.5.0",
"nvidia-modelopt[torch]; sys_platform != 'darwin'",
"transformer-engine[pytorch,core_cu13]>=2.9.0a0,<2.12.0",
"nvidia-resiliency-ext",
"tqdm",
"einops~=0.8",
"tensorstore~=0.1,!=0.1.46,!=0.1.72",
"nvtx~=0.2",
"multi-storage-client~=0.27",
"opentelemetry-api~=1.33.1",
"setuptools<80.0.0",
"mamba-ssm~=2.2",
"causal-conv1d~=1.5",
"flash-linear-attention~=0.3.2",
"nv-grouped-gemm~=1.1",
"megatron-energon[av_decode]~=6.0",
"av<16.0.0",
"flashinfer-python",
"av",
"flashinfer-python~=0.5.0",
"wget",
"onnxscript",
"flash-linear-attention~=0.3.2",
# VCS dependency - must match pyproject.toml [tool.uv.sources]
"emerging_optimizers @ git+https://github.com/NVIDIA-NeMo/[email protected]",
"datasets",
"fastapi~=0.50",
]


Expand Down
3 changes: 1 addition & 2 deletions examples/configs/grpo_math_1B_megatron.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,10 @@ policy:
top_k: null
mcore_generation_config:
buffer_size_gb: 20 # Total GPU memory (in GB) allocated for KV cache buffers
buffer_guaranteed_fraction: 0.1 # Fraction of buffer reserved for guaranteed active requests
num_cuda_graphs: 16 # Number of CUDA graphs to pre-compile for different batch sizes
block_size_tokens: 256 # Size of each KV cache block in tokens (affects memory granularity)
use_cuda_graphs_for_non_decode_steps: true # Enable CUDA graphs for prefill/context processing
enable_chunked_prefill: true # Split long prefills into chunks for better memory management
enable_chunked_prefill: false # Split long prefills into chunks for better memory management
unified_memory_level: 0 # Unified memory usage level (0=disabled, higher values enable more aggressive paging)
max_tokens: 16384 # Maximum number of tokens to use in a single step. Analogous to vllm's max_num_batched_tokens

Expand Down
2 changes: 0 additions & 2 deletions nemo_rl/models/megatron/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@
class MegatronGenerationConfig(TypedDict):
# Total GPU memory (in GB) allocated for KV cache buffers
buffer_size_gb: int
# Fraction of buffer reserved for guaranteed active requests
buffer_guaranteed_fraction: float
# Number of CUDA graphs to pre-compile for different batch sizes
num_cuda_graphs: int
# Size of each KV cache block in tokens (affects memory granularity)
Expand Down
16 changes: 14 additions & 2 deletions nemo_rl/models/megatron/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
from megatron.bridge.utils.instantiate_utils import InstantiationMode
from megatron.bridge.utils.vocab_utils import calculate_padded_vocab_size
from megatron.core import parallel_state
from megatron.core.process_groups_config import ProcessGroupCollection
from megatron.core.transformer import MegatronModule
from megatron.core.transformer.module import Float16Module
from megatron.core.transformer.transformer_config import TransformerConfig
Expand Down Expand Up @@ -663,12 +664,15 @@ def setup_model_and_optimizer(
checkpointing_context = init_checkpointing_context(megatron_cfg.checkpoint)

# Tokenizer
if megatron_cfg.tokenizer.hf_tokenizer_kwargs is None:
megatron_cfg.tokenizer.hf_tokenizer_kwargs = {}
megatron_cfg.tokenizer.hf_tokenizer_kwargs["trust_remote_code"] = True
megatron_cfg.tokenizer.hf_tokenizer_kwargs["use_fast"] = True
build_tokenizer(
megatron_cfg.tokenizer,
make_vocab_size_divisible_by=megatron_cfg.model.make_vocab_size_divisible_by
// megatron_cfg.model.tensor_model_parallel_size,
tensor_model_parallel_size=megatron_cfg.model.tensor_model_parallel_size,
trust_remote_code=True,
)
assert megatron_cfg.model.vocab_size, "vocab size must be specified in model config"

Expand Down Expand Up @@ -731,6 +735,8 @@ def composed_peft_hook(model: list[MegatronModule]) -> list[MegatronModule]:
pre_wrap_hook.extend([composed_peft_hook])

# Model, optimizer, and learning rate.
pg_collection = ProcessGroupCollection.use_mpu_process_groups()
setattr(megatron_cfg.model, "_pg_collection", pg_collection)
model = get_model(
megatron_cfg.model,
megatron_cfg.ddp,
Expand All @@ -739,6 +745,7 @@ def composed_peft_hook(model: list[MegatronModule]) -> list[MegatronModule]:
data_parallel_random_init=megatron_cfg.rng.data_parallel_random_init,
pre_wrap_hook=pre_wrap_hook,
mixed_precision_wrapper=mixed_precision_wrapper,
pg_collection=pg_collection,
)
if load_optimizer:
optimizer, scheduler = setup_optimizer(
Expand Down Expand Up @@ -872,6 +879,7 @@ def setup_reference_model_state(
overlap_param_gather_with_optimizer_step=megatron_cfg.optimizer.overlap_param_gather_with_optimizer_step,
pre_wrap_hook=megatron_cfg.rng.data_parallel_random_init,
mixed_precision_wrapper=ref_mixed_precision_wrapper,
pg_collection=ProcessGroupCollection.use_mpu_process_groups(),
)

print("Loading the Reference Model")
Expand Down Expand Up @@ -925,19 +933,23 @@ def finalize_megatron_setup(
megatron_cfg.ddp,
optimizer,
align_grad_reduce=megatron_cfg.dist.align_grad_reduce,
pg_collection=ProcessGroupCollection.use_mpu_process_groups(),
)

tokenizer_config = TokenizerConfig(
tokenizer_type="HuggingFaceTokenizer",
tokenizer_model=hf_model_name,
hf_tokenizer_kwargs={
"trust_remote_code": True,
"use_fast": True,
},
)

megatron_tokenizer = build_tokenizer(
tokenizer_config,
make_vocab_size_divisible_by=megatron_cfg.model.make_vocab_size_divisible_by
// config["megatron_cfg"]["tensor_model_parallel_size"],
tensor_model_parallel_size=config["megatron_cfg"]["tensor_model_parallel_size"],
trust_remote_code=True,
)

dp_size = worker_sharding_annotations.get_axis_size("data_parallel")
Expand Down
33 changes: 18 additions & 15 deletions nemo_rl/models/policy/workers/megatron_policy_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
maybe_finalize_async_save,
save_checkpoint,
)
from megatron.bridge.training.utils.pg_utils import get_pg_collection
from megatron.bridge.training.utils.train_utils import (
logical_and_across_model_parallel_group,
reduce_max_stat_across_model_parallel_group,
Expand Down Expand Up @@ -415,18 +416,20 @@ def train(
else:
update_successful, grad_norm, num_zeros_in_grad = (True, 0.0, 0.0)

pg_collection = get_pg_collection(self.model)

# when freezing sub-models we may have a mixture of successful and unsucessful ranks,
# so we must gather across mp ranks
update_successful = logical_and_across_model_parallel_group(
update_successful
update_successful, mp_group=pg_collection.mp
)
# grad_norm and num_zeros_in_grad will be None on ranks without trainable params,
# so we must gather across mp ranks
grad_norm: float = reduce_max_stat_across_model_parallel_group(
grad_norm
grad_norm, mp_group=pg_collection.mp
)
num_zeros_in_grad: float = reduce_max_stat_across_model_parallel_group(
num_zeros_in_grad
num_zeros_in_grad, mp_group=pg_collection.mp
)

if update_successful:
Expand Down Expand Up @@ -1036,9 +1039,6 @@ def generate(
]
enable_chunked_prefill = mcore_generation_config["enable_chunked_prefill"]
unified_memory_level = mcore_generation_config["unified_memory_level"]
buffer_guaranteed_fraction = mcore_generation_config[
"buffer_guaranteed_fraction"
]
max_tokens = mcore_generation_config["max_tokens"]

model_config = self.model.config
Expand All @@ -1050,7 +1050,6 @@ def generate(
kv_channels=model_config.kv_channels,
num_attention_heads=model_config.num_query_groups,
max_sequence_length=self.cfg["generation"]["max_new_tokens"],
buffer_guaranteed_fraction=buffer_guaranteed_fraction,
buffer_size_gb=buffer_size_gb,
materialize_only_last_token_logits=False,
num_cuda_graphs=num_cuda_graphs,
Expand All @@ -1061,7 +1060,7 @@ def generate(
use_cuda_graphs_for_non_decode_steps=use_cuda_graphs_for_non_decode_steps,
use_flashinfer_fused_rope=False,
unified_memory_level=unified_memory_level,
max_tokens_override=max_tokens,
max_tokens=max_tokens,
)
inference_wrapped_model = GPTInferenceWrapper(
self.model, inference_wrapper_config, dynamic_context
Expand Down Expand Up @@ -1134,23 +1133,27 @@ def generate(

result = []
while dynamic_engine.has_unfinished_requests():
result_step = dynamic_engine.step_modern(verbose=False)
finished_requests = result_step.get("finished_requests", [])
for finished_request in finished_requests:
result.append(finished_request)
result_step = dynamic_engine.step_modern()
result.extend(result_step["finished_request_records"])

# Sort results by request_id to maintain original batch order
result.sort(key=lambda x: x.request_id)

out = {
"tokens": [x.prompt_tokens.tolist() + x.generated_tokens for x in result],
"logprobs": [x.prompt_log_probs + x.generated_log_probs for x in result],
"tokens": [
x.requests[0].prompt_tokens.tolist() + x.requests[0].generated_tokens
for x in result
],
"logprobs": [
x.requests[0].prompt_log_probs + x.requests[0].generated_log_probs
for x in result
],
}

input_lengths = data["input_lengths"]
# pad the out "tokens" and "logprobs" and make them into tensors from lists
batch_size = data["input_ids"].size(0)
max_gen_seq_len = max([len(x.generated_tokens) for x in result])
max_gen_seq_len = max([len(x.requests[0].generated_tokens) for x in result])
padded_input_length = input_ids.size(1)

max_seq_len = padded_input_length + max_gen_seq_len
Expand Down
6 changes: 6 additions & 0 deletions tests/unit/models/megatron/test_megatron_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -920,6 +920,7 @@ def test_reinitialize_parallel_state_after_import(
class TestSetupModelAndOptimizer:
"""Tests for setup_model_and_optimizer function."""

@patch("nemo_rl.models.megatron.setup.ProcessGroupCollection")
@patch("nemo_rl.models.megatron.setup.GlobalState")
@patch("nemo_rl.models.megatron.setup.initialize_megatron")
@patch("nemo_rl.models.megatron.setup.set_jit_fusion_options")
Expand All @@ -946,6 +947,7 @@ def test_setup_with_param_sync_and_frozen_moe_router(
mock_set_jit,
mock_init_megatron,
mock_global_state,
mock_pg_collection,
):
"""Test setup_model_and_optimizer with MoE router freezing."""
from nemo_rl.models.megatron.setup import setup_model_and_optimizer
Expand Down Expand Up @@ -1006,6 +1008,7 @@ def test_setup_with_param_sync_and_frozen_moe_router(
class TestSetupReferenceModelState:
"""Tests for setup_reference_model_state function."""

@patch("nemo_rl.models.megatron.setup.ProcessGroupCollection")
@patch("nemo_rl.models.megatron.setup.init_checkpointing_context")
@patch("nemo_rl.models.megatron.setup.GlobalState")
@patch("nemo_rl.models.megatron.setup.get_model")
Expand All @@ -1019,6 +1022,7 @@ def test_setup_reference_model(
mock_get_model,
mock_global_state,
mock_init_ckpt_context,
mock_pg_collection,
capsys,
):
"""Test setup_reference_model_state when checkpoint exists."""
Expand Down Expand Up @@ -1075,6 +1079,7 @@ def test_setup_reference_model(
class TestFinalizeMegatronSetup:
"""Tests for finalize_megatron_setup function."""

@patch("nemo_rl.models.megatron.setup.ProcessGroupCollection")
@patch("nemo_rl.models.megatron.setup._update_model_config_funcs")
@patch("nemo_rl.models.megatron.setup.build_tokenizer")
@patch("nemo_rl.models.megatron.setup.AutoBridge")
Expand All @@ -1083,6 +1088,7 @@ def test_basic_finalize_setup(
mock_auto_bridge,
mock_build_tokenizer,
mock_update_model_config,
mock_pg_collection,
):
"""Test basic finalize_megatron_setup."""
from nemo_rl.models.megatron.setup import finalize_megatron_setup
Expand Down
7 changes: 3 additions & 4 deletions tests/unit/models/policy/test_megatron_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,13 @@ def create_megatron_test_config(
"stop_token_ids": None,
"stop_strings": None,
"mcore_generation_config": {
"buffer_size_gb": 20,
"buffer_guaranteed_fraction": 0.1,
"buffer_size_gb": 2,
"num_cuda_graphs": 16,
"block_size_tokens": 256,
"block_size_tokens": 1024,
"use_cuda_graphs_for_non_decode_steps": True,
"enable_chunked_prefill": True,
"unified_memory_level": 0,
"max_tokens": 16384,
"max_tokens": 65536,
},
"colocated": {
"enabled": True,
Expand Down
Loading
Loading