From ff35c790b8c623ded0abebd46b806353cfcaa77f Mon Sep 17 00:00:00 2001 From: Helen Ngo Date: Wed, 26 Nov 2025 08:49:45 -0800 Subject: [PATCH 1/9] Add logging for tokens per second some cleanup cleanup cleanup --- megatron/core/pipeline_parallel/schedules.py | 26 ++++++++++++++------ megatron/rl/rl_utils.py | 8 ++++++ megatron/training/arguments.py | 2 ++ megatron/training/training.py | 14 ++++++++++- pretrain_gpt.py | 3 ++- train_rl.py | 2 +- 6 files changed, 44 insertions(+), 11 deletions(-) diff --git a/megatron/core/pipeline_parallel/schedules.py b/megatron/core/pipeline_parallel/schedules.py index e83f8d90635..68f7a25cdbe 100644 --- a/megatron/core/pipeline_parallel/schedules.py +++ b/megatron/core/pipeline_parallel/schedules.py @@ -399,9 +399,9 @@ def forward_step( context_manager = contextlib.nullcontext() with context_manager: if checkpoint_activations_microbatch is None: - output_tensor, loss_func = forward_step_func(data_iterator, model) + output_tensor, loss_func, num_empty_bins = forward_step_func(data_iterator, model) else: - output_tensor, loss_func = forward_step_func( + output_tensor, loss_func, num_empty_bins = forward_step_func( data_iterator, model, checkpoint_activations_microbatch ) output_tensor, num_tokens = forward_step_calc_loss( @@ -418,8 +418,8 @@ def forward_step( ) if unwrap_output_tensor: - return output_tensor, num_tokens - return [output_tensor], num_tokens + return output_tensor, num_tokens, num_empty_bins + return [output_tensor], num_tokens, num_empty_bins def backward_step(input_tensor, output_tensor, output_tensor_grad, model_type, config): @@ -573,6 +573,7 @@ def forward_backward_no_pipelining( total_num_tokens = torch.zeros([], dtype=torch.int, device="cuda") if config.overlap_moe_expert_parallel_comm and not forward_only: + num_empty_bins = 0 forward_data_store, total_num_tokens = combined_1f1b_schedule_for_no_pipelining( forward_step_func, data_iterator, @@ -592,7 +593,7 @@ def forward_backward_no_pipelining( else: with no_sync_func(): for i in range(num_microbatches - 1): - output_tensor, num_tokens = forward_step( + output_tensor, num_tokens, num_empty_bins = forward_step( forward_step_func, data_iterator, model, @@ -612,7 +613,7 @@ def forward_backward_no_pipelining( ) # Run computation for last microbatch out of context handler (want to # synchronize gradients). - output_tensor, num_tokens = forward_step( + output_tensor, num_tokens, num_empty_bins = forward_step( forward_step_func, data_iterator, model, @@ -652,6 +653,8 @@ def forward_backward_no_pipelining( ): create_cudagraphs() + forward_data_store.append(num_empty_bins) + return forward_data_store @@ -1348,6 +1351,7 @@ def forward_backward_helper_wrapper( return forward_output_tensor, backward_input_tensor_grad # ==============================main logic========================================= + num_empty_bins = 0 _is_vp_first_stage = partial( is_vp_first_stage, vp_size=config.virtual_pipeline_model_parallel_size ) @@ -1917,6 +1921,8 @@ def pp_post_backward(input_tensor_grad, vp_stage=None): create_cudagraphs() nvtx_range_pop(suffix="misc") + forward_data_store.append(num_empty_bins) + return forward_data_store @@ -1977,6 +1983,8 @@ def forward_backward_pipelining_without_interleaving( data_iterator = data_iterator[0] config = get_model_config(model) + num_empty_bins = 0 + if config.overlap_p2p_comm: raise ValueError( "Non-interleaved pipeline parallelism does not support overlapping p2p communication" @@ -2132,7 +2140,7 @@ def enable_grad_sync(): input_tensor = p2p_communicator.recv_forward( recv_tensor_shapes, is_pp_first_stage(p2p_communicator.pp_group) ) - output_tensor, num_tokens = forward_step( + output_tensor, num_tokens, num_empty_bins = forward_step( forward_step_func, data_iterator, model, @@ -2175,7 +2183,7 @@ def enable_grad_sync(): else: checkpoint_activations_microbatch = None - output_tensor, num_tokens = forward_step( + output_tensor, num_tokens, num_empty_bins = forward_step( forward_step_func, data_iterator, model, @@ -2300,4 +2308,6 @@ def enable_grad_sync(): ): create_cudagraphs() + forward_data_store.append(num_empty_bins) + return forward_data_store diff --git a/megatron/rl/rl_utils.py b/megatron/rl/rl_utils.py index b75d920d388..1d110a55289 100644 --- a/megatron/rl/rl_utils.py +++ b/megatron/rl/rl_utils.py @@ -1453,6 +1453,13 @@ def prepare_data_for_update( bin_idx = current_bins + i packing_info['bin_seq_indices'].append(entry['bin_seq_indices']) packing_info['seq_starts'][bin_idx] = entry['seq_starts'] + else: + num_empty_bins = 0 + + # Sum the total number of empty bins across the ranks + empty_bin_count = torch.tensor([num_empty_bins], device='cuda') + torch.distributed.all_reduce(empty_bin_count, op=torch.distributed.ReduceOp.SUM) + num_empty_bins = empty_bin_count[0].cpu().numpy() packing_context['packing_info'] = packing_info packing_context['original_generation_masks'] = generation_masks @@ -1833,6 +1840,7 @@ def prepare_data_for_update( 'num_sequences': len(packing_info['seq_lengths']), 'avg_seqs_per_bin': global_avg_seqs_per_bin, 'avg_seqs_per_bin_this_rank': actual_seqs_per_bin_this_rank, + 'num_empty_bins': num_empty_bins, # summed across ranks } if args.micro_batch_size != 1: diff --git a/megatron/training/arguments.py b/megatron/training/arguments.py index a4d1a07d832..137c795ac22 100644 --- a/megatron/training/arguments.py +++ b/megatron/training/arguments.py @@ -1885,6 +1885,8 @@ def _add_logging_args(parser): help='Path to save the wandb results locally.') group.add_argument('--logging-level', type=int, default=None, help='Set default logging level') + group.add_argument('--log-tokens-per-second', default=False, action="store_true", + help='Whether to log tokens per second.') return parser diff --git a/megatron/training/training.py b/megatron/training/training.py index d2dd9292979..fd9d8fd9ae3 100644 --- a/megatron/training/training.py +++ b/megatron/training/training.py @@ -1376,10 +1376,13 @@ def train_step(forward_step_func, data_iterator, model, optimizer, opt_param_sch if args.empty_unused_memory_level >= 2: torch.cuda.empty_cache() + num_empty_bins = 0 if mpu.is_pipeline_last_stage(ignore_virtual=True): # Average loss across microbatches. loss_reduced = {} + num_empty_bins = losses_reduced.pop() + for key in losses_reduced[0].keys(): val = [x[key].view(-1) for x in losses_reduced] if val[0].numel() == 2: @@ -1419,8 +1422,9 @@ def train_step(forward_step_func, data_iterator, model, optimizer, opt_param_sch exit_code, grad_norm, num_zeros_in_grad, + num_empty_bins ) - return {}, skipped_iter, should_checkpoint, should_exit, exit_code, grad_norm, num_zeros_in_grad + return {}, skipped_iter, should_checkpoint, should_exit, exit_code, grad_norm, num_zeros_in_grad, num_empty_bins def training_log( @@ -1434,6 +1438,7 @@ def training_log( grad_norm, params_norm, num_zeros_in_grad, + num_empty_bins ): """Log training information such as losses, timing, ....""" args = get_args() @@ -1701,6 +1706,8 @@ def training_log( total_loss_dict[skipped_iters_key] ) log_string += ' number of nan iterations: {:3d} |'.format(total_loss_dict[nan_iters_key]) + if args.log_tokens_per_second: + log_string += f' tokens per second: {((batch_size - num_empty_bins) * args.seq_length) / elapsed_time_per_iteration}' total_loss_dict[advanced_iters_key] = 0 total_loss_dict[skipped_iters_key] = 0 total_loss_dict[nan_iters_key] = 0 @@ -2354,6 +2361,7 @@ def get_e2e_base_metrics(): exit_code, grad_norm, num_zeros_in_grad, + num_empty_bins ) = train_step( forward_step_func, train_data_iterator, model, optimizer, opt_param_scheduler, config, forward_backward_func ) @@ -2451,6 +2459,7 @@ def get_e2e_base_metrics(): grad_norm, params_norm, num_zeros_in_grad, + num_empty_bins ) # Evaluation. @@ -2627,6 +2636,9 @@ def evaluate( torch.cuda.empty_cache() if mpu.is_pipeline_last_stage(ignore_virtual=True): + + _ = loss_dicts.pop() + # Reduce across processes. for key in loss_dicts[0].keys(): if key not in total_loss_dict: diff --git a/pretrain_gpt.py b/pretrain_gpt.py index 6b602d33243..0ad79022a6e 100644 --- a/pretrain_gpt.py +++ b/pretrain_gpt.py @@ -129,6 +129,7 @@ def forward_step(data_iterator, model: GPTModel, return_schedule_plan: bool = Fa """ args = get_args() timers = get_timers() + num_empty_bins = 0 # Number of padding bins from the data loader # Get the batch. timers('batch-generator', log_level=2).start() @@ -155,7 +156,7 @@ def forward_step(data_iterator, model: GPTModel, return_schedule_plan: bool = Fa ) # [ModelOpt]: model is needed to access ModelOpt distillation losses - return output_tensor, partial(loss_func, loss_mask, model=model) + return output_tensor, partial(loss_func, loss_mask, model=model), num_empty_bins def is_dataset_built_on_rank(vp_stage=None): diff --git a/train_rl.py b/train_rl.py index 33fca0cb840..cde9d2f8388 100644 --- a/train_rl.py +++ b/train_rl.py @@ -309,7 +309,7 @@ def forward_step(data_iterator, model: GPTModel, loss_only: bool = False): entropy_term, truncated_from_above, truncated_from_below, - ) + ), runtime_state.sequence_packing_metadata['num_empty_bins'] def train_valid_test_datasets_provider(train_val_test_num_samples): From ed21a3c5e3c50f7e6174e1b642e78f6796f3c0eb Mon Sep 17 00:00:00 2001 From: Helen Ngo Date: Wed, 3 Dec 2025 14:10:38 -0800 Subject: [PATCH 2/9] Fix non-seq-pack case --- train_rl.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/train_rl.py b/train_rl.py index cde9d2f8388..da1ecf994e6 100644 --- a/train_rl.py +++ b/train_rl.py @@ -300,6 +300,11 @@ def forward_step(data_iterator, model: GPTModel, loss_only: bool = False): ) ) + if runtime_state.sequence_packing_metadata: + num_empty_bins = runtime_state.sequence_packing_metadata['num_empty_bins'] + else: + num_empty_bins = None + # loss_mask will not be applied to 0th token as we do not have a logprob for it. return loss, partial( loss_func, @@ -309,7 +314,7 @@ def forward_step(data_iterator, model: GPTModel, loss_only: bool = False): entropy_term, truncated_from_above, truncated_from_below, - ), runtime_state.sequence_packing_metadata['num_empty_bins'] + ), num_empty_bins def train_valid_test_datasets_provider(train_val_test_num_samples): From 06212b7254a82ed28017cb78bb3a05d98ba8b082 Mon Sep 17 00:00:00 2001 From: Helen Ngo Date: Mon, 5 Jan 2026 11:28:44 -0800 Subject: [PATCH 3/9] Fix after merging with seqpack refactor --- megatron/rl/rl_utils.py | 3 +- megatron/rl/sequence_packing_utils.py | 185 ++++++++++++++------------ train_rl.py | 4 +- 3 files changed, 103 insertions(+), 89 deletions(-) diff --git a/megatron/rl/rl_utils.py b/megatron/rl/rl_utils.py index 92955cda900..f95e293050c 100644 --- a/megatron/rl/rl_utils.py +++ b/megatron/rl/rl_utils.py @@ -895,7 +895,7 @@ def prepare_data_for_update( if args.rl_use_sequence_packing: with nvtx_range("sequence_packing", time=True): runtime_state.packing_context = packing_context = pack_all_trajectories( - trajs, + trajs, generation_masks, inference_logprobs, global_advantages, @@ -969,6 +969,7 @@ def logprobs_forward_step(data_iterator, model): packed_seq_params=b_packed_seq_params, ), None, + 0 # These tokens do not count toward the tokens/second calculation ) dtype = ( diff --git a/megatron/rl/sequence_packing_utils.py b/megatron/rl/sequence_packing_utils.py index 1e9063f0947..60ee4092fd8 100644 --- a/megatron/rl/sequence_packing_utils.py +++ b/megatron/rl/sequence_packing_utils.py @@ -70,6 +70,7 @@ class PackingContext: original_inference_logprobs: Optional[torch.Tensor] = None bin_advantages: List[torch.Tensor] = field(default_factory=list) cached_packed_seq_params: List[Optional[PackedSeqParams]] = field(default_factory=list) + stats: Optional[dict] = None def load_packed_data_by_index(bin_idx: int, packing_context: PackingContext, logprobs_is_correction: bool): @@ -156,7 +157,6 @@ def log_packing_efficiency(packing_context: PackingContext): packing_efficiency = my_tokens / total_capacity if total_capacity > 0 else 0 avg_seq_length = total_tokens / len(packing_info.seq_lengths) rank = mpu.get_data_parallel_rank() - data_parallel_world_size = mpu.get_data_parallel_world_size() log_single_rank(logger, logging.INFO, f"[Sequence Packing] Statistics:") log_single_rank( @@ -189,98 +189,110 @@ def log_packing_efficiency(packing_context: PackingContext): ) # Add detailed per-rank sequence distribution analysis - if torch.distributed.is_initialized(): - # Gather sequence counts from all ranks - seq_counts_per_bin = [len(indices) for indices in my_bin_seq_indices] - non_empty_bins = [c for c in seq_counts_per_bin if c > 0] - - # Create tensor with rank statistics - rank_stats = torch.tensor( - [ - float(rank), - float(len(my_bin_seq_indices)), # total bins - float(len(non_empty_bins)), # non-empty bins - float(my_sequences), # total sequences - ( - float(min(non_empty_bins)) if non_empty_bins else 0.0 - ), # min sequences per bin - ( - float(max(non_empty_bins)) if non_empty_bins else 0.0 - ), # max sequences per bin - ( - float(my_sequences / len(non_empty_bins)) if non_empty_bins else 0.0 - ), # avg sequences per non-empty bin - ], - device='cuda', - ) - # Gather from all ranks - world_size = mpu.get_data_parallel_world_size() - all_rank_stats = [torch.zeros_like(rank_stats) for _ in range(world_size)] - torch.distributed.all_gather( - all_rank_stats, rank_stats, group=mpu.get_data_parallel_group() - ) + # Gather sequence counts from all ranks + seq_counts_per_bin = [len(indices) for indices in my_bin_seq_indices] + non_empty_bins = [c for c in seq_counts_per_bin if c > 0] + empty_bins_on_rank = len(my_bin_seq_indices) - len(non_empty_bins) + + # Create tensor with rank statistics + rank_stats = torch.tensor( + [ + float(rank), + float(len(my_bin_seq_indices)), # total bins + float(len(non_empty_bins)), # non-empty bins + float(my_sequences), # total sequences + ( + float(min(non_empty_bins)) if non_empty_bins else 0.0 + ), # min sequences per bin + ( + float(max(non_empty_bins)) if non_empty_bins else 0.0 + ), # max sequences per bin + ( + float(my_sequences / len(non_empty_bins)) if non_empty_bins else 0.0 + ), # avg sequences per non-empty bin + float(empty_bins_on_rank), # empty bins on each rank + ], + device='cuda', + ) - # Print detailed statistics for each rank - if rank == 0: - log_single_rank( - logger, - logging.INFO, - f"[Sequence Packing] Per-rank distribution ({packing_info.packing_algo} algorithm):", - ) - log_single_rank( - logger, - logging.INFO, - "[Sequence Packing] Rank | Total Bins | Non-empty | Sequences | Min/Bin | Max/Bin | Avg/Bin", - ) - log_single_rank( - logger, - logging.INFO, - "[Sequence Packing] -----|------------|-----------|-----------|---------|---------|--------", - ) - for stats in all_rank_stats: - r = int(stats[0].item()) - total_bins = int(stats[1].item()) - non_empty = int(stats[2].item()) - sequences = int(stats[3].item()) - min_seq = int(stats[4].item()) - max_seq = int(stats[5].item()) - avg_seq = stats[6].item() - log_single_rank( - logger, - logging.INFO, - f"[Sequence Packing] {r:3d} | {total_bins:10d} | {non_empty:9d} | {sequences:9d} | {min_seq:7d} | {max_seq:7d} | {avg_seq:6.1f}", - ) + # Gather from all ranks + world_size = mpu.get_data_parallel_world_size() + all_rank_stats = [torch.zeros_like(rank_stats) for _ in range(world_size)] + torch.distributed.all_gather( + all_rank_stats, rank_stats, group=mpu.get_data_parallel_group() + ) + all_rank_stats_tensor = torch.stack(all_rank_stats, dim=0) - # Also show first few bins for rank 0 as example + # Print detailed statistics for each rank + if rank == 0: + log_single_rank( + logger, + logging.INFO, + f"[Sequence Packing] Per-rank distribution ({packing_info.packing_algo} algorithm):", + ) + log_single_rank( + logger, + logging.INFO, + "[Sequence Packing] Rank | Total Bins | Non-empty | Sequences | Min/Bin | Max/Bin | Avg/Bin", + ) + log_single_rank( + logger, + logging.INFO, + "[Sequence Packing] -----|------------|-----------|-----------|---------|---------|--------", + ) + for stats in all_rank_stats: + r = int(stats[0].item()) + total_bins = int(stats[1].item()) + non_empty = int(stats[2].item()) + sequences = int(stats[3].item()) + min_seq = int(stats[4].item()) + max_seq = int(stats[5].item()) + avg_seq = stats[6].item() log_single_rank( logger, logging.INFO, - f"[Sequence Packing] Example (Rank 0 first 10 bins): {seq_counts_per_bin[:10]}", + f"[Sequence Packing] {r:3d} | {total_bins:10d} | {non_empty:9d} | {sequences:9d} | {min_seq:7d} | {max_seq:7d} | {avg_seq:6.1f}", ) - # Show the improvement from round-robin - total_seqs_all_ranks = sum(int(stats[3].item()) for stats in all_rank_stats) - avg_seqs_per_rank = total_seqs_all_ranks / world_size - max_deviation = max( - abs(int(stats[3].item()) - avg_seqs_per_rank) - for stats in all_rank_stats - ) - log_single_rank( - logger, - logging.INFO, - f"[Sequence Packing] Round-robin distribution quality:", - ) - log_single_rank( - logger, - logging.INFO, - f"[Sequence Packing] - Average sequences per rank: {avg_seqs_per_rank:.1f}", - ) - log_single_rank( - logger, - logging.INFO, - f"[Sequence Packing] - Max deviation from average: {max_deviation:.0f} sequences ({max_deviation/avg_seqs_per_rank*100:.1f}%)", - ) + # Also show first few bins for rank 0 as example + log_single_rank( + logger, + logging.INFO, + f"[Sequence Packing] Example (Rank 0 first 10 bins): {seq_counts_per_bin[:10]}", + ) + + # Show the improvement from round-robin + total_seqs_all_ranks = sum(int(stats[3].item()) for stats in all_rank_stats) + avg_seqs_per_rank = total_seqs_all_ranks / world_size + max_deviation = max( + abs(int(stats[3].item()) - avg_seqs_per_rank) + for stats in all_rank_stats + ) + log_single_rank( + logger, + logging.INFO, + f"[Sequence Packing] Round-robin distribution quality:", + ) + log_single_rank( + logger, + logging.INFO, + f"[Sequence Packing] - Average sequences per rank: {avg_seqs_per_rank:.1f}", + ) + log_single_rank( + logger, + logging.INFO, + f"[Sequence Packing] - Max deviation from average: {max_deviation:.0f} sequences ({max_deviation/avg_seqs_per_rank*100:.1f}%)", + ) + + result = { + "total_num_bins": int(torch.sum(all_rank_stats_tensor[:, 1]).item()), + "total_non_empty_bins": int(torch.sum(all_rank_stats_tensor[:, 2]).item()), + "total_empty_bins": int(torch.sum(all_rank_stats_tensor[:, 7]).item()), + "total_sequences": int(torch.sum(all_rank_stats_tensor[:, 3]).item()), + } + + return result def get_actual_sequence_lengths(sequences: torch.Tensor, pad_token: int) -> List[int]: """Get actual sequence lengths for pre-padded sequences. @@ -1058,7 +1070,8 @@ def pack_all_trajectories(trajs, generation_masks, inference_logprobs, global_ad cached_packed_seq_params=cached_packed_seq_params, ) - log_packing_efficiency(packing_context) + stats_aggregated_over_all_ranks = log_packing_efficiency(packing_context) + packing_context.stats = stats_aggregated_over_all_ranks return packing_context diff --git a/train_rl.py b/train_rl.py index 9f6c74125dc..1f99ec50eb3 100644 --- a/train_rl.py +++ b/train_rl.py @@ -301,8 +301,8 @@ def forward_step(data_iterator, model: GPTModel, loss_only: bool = False): ) output_tensor = loss - if runtime_state.sequence_packing_metadata: - num_empty_bins = runtime_state.sequence_packing_metadata['num_empty_bins'] + if runtime_state.packing_context: + num_empty_bins = runtime_state.packing_context.stats['total_empty_bins'] else: num_empty_bins = None From 64e0d0a6d35e28ccd3481dfdb79949d666ded221 Mon Sep 17 00:00:00 2001 From: Helen Ngo Date: Tue, 6 Jan 2026 08:51:47 -0800 Subject: [PATCH 4/9] Fix tests --- tests/unit_tests/export/trtllm/test_distributed_fp8.py | 2 +- tests/unit_tests/export/trtllm/test_single_device_fp8.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit_tests/export/trtllm/test_distributed_fp8.py b/tests/unit_tests/export/trtllm/test_distributed_fp8.py index cf47a864108..5b69b2a3d5b 100644 --- a/tests/unit_tests/export/trtllm/test_distributed_fp8.py +++ b/tests/unit_tests/export/trtllm/test_distributed_fp8.py @@ -102,7 +102,7 @@ def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor): loss_mask = data['loss_mask'].to(DEVICE) output_tensor = model(tokens, position_ids, attention_mask, labels=labels) - return output_tensor, partial(loss_func, loss_mask) + return output_tensor, partial(loss_func, loss_mask), None class TestTRTLLMSingleDeviceConverterFP8: diff --git a/tests/unit_tests/export/trtllm/test_single_device_fp8.py b/tests/unit_tests/export/trtllm/test_single_device_fp8.py index 04bbfdb127d..608828a887e 100644 --- a/tests/unit_tests/export/trtllm/test_single_device_fp8.py +++ b/tests/unit_tests/export/trtllm/test_single_device_fp8.py @@ -99,7 +99,7 @@ def _loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor): loss_mask = data['loss_mask'].to(DEVICE) output_tensor = model(tokens, position_ids, attention_mask, labels=labels) - return output_tensor, partial(_loss_func, loss_mask) + return output_tensor, partial(_loss_func, loss_mask), None class TestTRTLLMSingleDeviceConverterFP8: From d6464018d0b123cd2f1c62bc8e8e477a2efd49e9 Mon Sep 17 00:00:00 2001 From: Helen Ngo Date: Tue, 6 Jan 2026 09:49:18 -0800 Subject: [PATCH 5/9] Fix transformer unit test --- tests/unit_tests/transformer/test_full_cuda_graph.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit_tests/transformer/test_full_cuda_graph.py b/tests/unit_tests/transformer/test_full_cuda_graph.py index 312ae467304..1d92c6edee0 100644 --- a/tests/unit_tests/transformer/test_full_cuda_graph.py +++ b/tests/unit_tests/transformer/test_full_cuda_graph.py @@ -37,7 +37,7 @@ def forward_step_func(data_iterator, model): def loss_func(output_tensor): return rank, {'loss_reduced': rank} - return model(dummy_data), loss_func + return model(dummy_data), loss_func, None model = torch.nn.Linear(4, 1) From ef349fcc0c11beb32d1d3e527cec3ea81aa9e126 Mon Sep 17 00:00:00 2001 From: Helen Ngo Date: Tue, 6 Jan 2026 12:15:13 -0800 Subject: [PATCH 6/9] Fix PP unit test --- megatron/core/pipeline_parallel/schedules.py | 2 +- tests/unit_tests/pipeline_parallel/test_pipeline_layout.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/megatron/core/pipeline_parallel/schedules.py b/megatron/core/pipeline_parallel/schedules.py index d2c3d3a8eb8..e23ca8ddf83 100644 --- a/megatron/core/pipeline_parallel/schedules.py +++ b/megatron/core/pipeline_parallel/schedules.py @@ -1212,7 +1212,7 @@ def forward_step_helper(virtual_microbatch_id, checkpoint_activations_microbatch virtual_microbatch_id, model_chunk_id, microbatch_id ) - output_tensor, num_tokens = forward_step( + output_tensor, num_tokens, num_empty_bins = forward_step( forward_step_func, data_iterator[model_chunk_id], model[model_chunk_id], diff --git a/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py b/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py index faf83837a23..8d128bc7a81 100644 --- a/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py +++ b/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py @@ -224,7 +224,7 @@ def loss_func(output_tensor: torch.Tensor): loss = output_tensor.sum() return output_tensor, loss - return output_tensor, loss_func + return output_tensor, loss_func, None iteration = 123 layer_spec_fn = get_gpt_decoder_block_spec if is_moe else gpt_te_spec @@ -293,6 +293,8 @@ def loss_func(output_tensor: torch.Tensor): forward_only=True, ) + losses_reduced_baseline.pop() # Empty bins not required for this test + if parallel_state.is_pipeline_last_stage(ignore_virtual=True): for loss, loss_baseline in zip(losses_reduced, losses_reduced_baseline): assert torch.equal(loss, loss_baseline) From c1075f2a8ff12fe00cbf988fe4e263b3c2c9ac20 Mon Sep 17 00:00:00 2001 From: Helen Ngo Date: Tue, 6 Jan 2026 12:36:53 -0800 Subject: [PATCH 7/9] Update copyright header --- tests/unit_tests/export/trtllm/test_distributed_fp8.py | 2 ++ tests/unit_tests/export/trtllm/test_single_device_fp8.py | 2 ++ 2 files changed, 4 insertions(+) diff --git a/tests/unit_tests/export/trtllm/test_distributed_fp8.py b/tests/unit_tests/export/trtllm/test_distributed_fp8.py index 5b69b2a3d5b..22bf9c977c8 100644 --- a/tests/unit_tests/export/trtllm/test_distributed_fp8.py +++ b/tests/unit_tests/export/trtllm/test_distributed_fp8.py @@ -1,3 +1,5 @@ +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + from functools import partial import pytest diff --git a/tests/unit_tests/export/trtllm/test_single_device_fp8.py b/tests/unit_tests/export/trtllm/test_single_device_fp8.py index 608828a887e..6e75a79d8bf 100644 --- a/tests/unit_tests/export/trtllm/test_single_device_fp8.py +++ b/tests/unit_tests/export/trtllm/test_single_device_fp8.py @@ -1,3 +1,5 @@ +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + from functools import partial import pytest From 2524163a5765a2708334cec796b091ab24775718 Mon Sep 17 00:00:00 2001 From: Helen Ngo Date: Tue, 6 Jan 2026 13:39:35 -0800 Subject: [PATCH 8/9] Remove num_empty_bins for PP test --- tests/unit_tests/pipeline_parallel/test_pipeline_layout.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py b/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py index 8d128bc7a81..1a20dbf387c 100644 --- a/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py +++ b/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py @@ -253,6 +253,7 @@ def loss_func(output_tensor: torch.Tensor): micro_batch_size=1, forward_only=True, ) + losses_reduced.pop() # Empty bins not required for this test optimizer = None opt_param_scheduler = None From 7e93933e8b1eb4c51b70a58b3079bf83e2659b5b Mon Sep 17 00:00:00 2001 From: Helen Ngo Date: Tue, 6 Jan 2026 15:18:16 -0800 Subject: [PATCH 9/9] Fix empty bins unnecessary for PP test --- megatron/core/pipeline_parallel/schedules.py | 3 ++- tests/unit_tests/pipeline_parallel/test_schedules.py | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/megatron/core/pipeline_parallel/schedules.py b/megatron/core/pipeline_parallel/schedules.py index e23ca8ddf83..221904710f5 100644 --- a/megatron/core/pipeline_parallel/schedules.py +++ b/megatron/core/pipeline_parallel/schedules.py @@ -402,7 +402,8 @@ def forward_step( context_manager = contextlib.nullcontext() with context_manager: if checkpoint_activations_microbatch is None: - output_tensor, loss_func, num_empty_bins = forward_step_func(data_iterator, model) + output = forward_step_func(data_iterator, model) + output_tensor, loss_func, num_empty_bins = output else: output_tensor, loss_func, num_empty_bins = forward_step_func( data_iterator, model, checkpoint_activations_microbatch diff --git a/tests/unit_tests/pipeline_parallel/test_schedules.py b/tests/unit_tests/pipeline_parallel/test_schedules.py index b861aa2df49..846af3906b5 100644 --- a/tests/unit_tests/pipeline_parallel/test_schedules.py +++ b/tests/unit_tests/pipeline_parallel/test_schedules.py @@ -144,7 +144,7 @@ def forward_step_func(data_iterator, model): def loss_func(output_tensor): return rank, {'loss_reduced': rank} - return model(dummy_data), loss_func + return model(dummy_data), loss_func, None model = torch.nn.Linear(4, 1) model.model_type = 'unit-test' @@ -171,6 +171,8 @@ def set_input_tensor(input_tensor): forward_only=True, ) + losses_reduced.pop() # Empty bins is not used for this test + loss_reduced_expected = [ {'loss_reduced': rank}, {'loss_reduced': rank},