diff --git a/megatron/core/pipeline_parallel/schedules.py b/megatron/core/pipeline_parallel/schedules.py index 051e3a974a8..221904710f5 100644 --- a/megatron/core/pipeline_parallel/schedules.py +++ b/megatron/core/pipeline_parallel/schedules.py @@ -402,9 +402,10 @@ def forward_step( context_manager = contextlib.nullcontext() with context_manager: if checkpoint_activations_microbatch is None: - output_tensor, loss_func = forward_step_func(data_iterator, model) + output = forward_step_func(data_iterator, model) + output_tensor, loss_func, num_empty_bins = output else: - output_tensor, loss_func = forward_step_func( + output_tensor, loss_func, num_empty_bins = forward_step_func( data_iterator, model, checkpoint_activations_microbatch ) output_tensor, num_tokens = forward_step_calc_loss( @@ -421,8 +422,8 @@ def forward_step( ) if unwrap_output_tensor: - return output_tensor, num_tokens - return [output_tensor], num_tokens + return output_tensor, num_tokens, num_empty_bins + return [output_tensor], num_tokens, num_empty_bins def backward_step(input_tensor, output_tensor, output_tensor_grad, model_type, config): @@ -576,6 +577,7 @@ def forward_backward_no_pipelining( total_num_tokens = torch.zeros([], dtype=torch.int, device="cuda") if config.overlap_moe_expert_parallel_comm and not forward_only: + num_empty_bins = 0 forward_data_store, total_num_tokens = combined_1f1b_schedule_for_no_pipelining( forward_step_func, data_iterator, @@ -595,7 +597,7 @@ def forward_backward_no_pipelining( else: with no_sync_func(): for i in range(num_microbatches - 1): - output_tensor, num_tokens = forward_step( + output_tensor, num_tokens, num_empty_bins = forward_step( forward_step_func, data_iterator, model, @@ -615,7 +617,7 @@ def forward_backward_no_pipelining( ) # Run computation for last microbatch out of context handler (want to # synchronize gradients). - output_tensor, num_tokens = forward_step( + output_tensor, num_tokens, num_empty_bins = forward_step( forward_step_func, data_iterator, model, @@ -655,6 +657,8 @@ def forward_backward_no_pipelining( ): create_cudagraphs() + forward_data_store.append(num_empty_bins) + return forward_data_store @@ -1209,7 +1213,7 @@ def forward_step_helper(virtual_microbatch_id, checkpoint_activations_microbatch virtual_microbatch_id, model_chunk_id, microbatch_id ) - output_tensor, num_tokens = forward_step( + output_tensor, num_tokens, num_empty_bins = forward_step( forward_step_func, data_iterator[model_chunk_id], model[model_chunk_id], @@ -1351,6 +1355,7 @@ def forward_backward_helper_wrapper( return forward_output_tensor, backward_input_tensor_grad # ==============================main logic========================================= + num_empty_bins = 0 _is_vp_first_stage = partial( is_vp_first_stage, vp_size=config.virtual_pipeline_model_parallel_size ) @@ -1920,6 +1925,8 @@ def pp_post_backward(input_tensor_grad, vp_stage=None): create_cudagraphs() nvtx_range_pop(suffix="misc") + forward_data_store.append(num_empty_bins) + return forward_data_store @@ -1980,6 +1987,8 @@ def forward_backward_pipelining_without_interleaving( data_iterator = data_iterator[0] config = get_model_config(model) + num_empty_bins = 0 + if config.overlap_p2p_comm: raise ValueError( "Non-interleaved pipeline parallelism does not support overlapping p2p communication" @@ -2135,7 +2144,7 @@ def enable_grad_sync(): input_tensor = p2p_communicator.recv_forward( recv_tensor_shapes, is_pp_first_stage(p2p_communicator.pp_group) ) - output_tensor, num_tokens = forward_step( + output_tensor, num_tokens, num_empty_bins = forward_step( forward_step_func, data_iterator, model, @@ -2178,7 +2187,7 @@ def enable_grad_sync(): else: checkpoint_activations_microbatch = None - output_tensor, num_tokens = forward_step( + output_tensor, num_tokens, num_empty_bins = forward_step( forward_step_func, data_iterator, model, @@ -2303,4 +2312,6 @@ def enable_grad_sync(): ): create_cudagraphs() + forward_data_store.append(num_empty_bins) + return forward_data_store diff --git a/megatron/rl/rl_utils.py b/megatron/rl/rl_utils.py index fe8a2dd6c02..f95e293050c 100644 --- a/megatron/rl/rl_utils.py +++ b/megatron/rl/rl_utils.py @@ -875,6 +875,7 @@ def prepare_data_for_update( (mpu.get_data_parallel_rank() + 1) * data_split_size, ) rollouts = rollouts[data_split_range[0] : data_split_range[1]] + # First we calculate them on a global level and then we split and recalculate on a local level. # Sequence packing and reporting needs it global but non-packing wants it local. rewards = torch.tensor([[r.reward for r in group] for group in rollouts], device='cpu') @@ -894,7 +895,7 @@ def prepare_data_for_update( if args.rl_use_sequence_packing: with nvtx_range("sequence_packing", time=True): runtime_state.packing_context = packing_context = pack_all_trajectories( - trajs, + trajs, generation_masks, inference_logprobs, global_advantages, @@ -968,6 +969,7 @@ def logprobs_forward_step(data_iterator, model): packed_seq_params=b_packed_seq_params, ), None, + 0 # These tokens do not count toward the tokens/second calculation ) dtype = ( diff --git a/megatron/rl/sequence_packing_utils.py b/megatron/rl/sequence_packing_utils.py index 1e9063f0947..60ee4092fd8 100644 --- a/megatron/rl/sequence_packing_utils.py +++ b/megatron/rl/sequence_packing_utils.py @@ -70,6 +70,7 @@ class PackingContext: original_inference_logprobs: Optional[torch.Tensor] = None bin_advantages: List[torch.Tensor] = field(default_factory=list) cached_packed_seq_params: List[Optional[PackedSeqParams]] = field(default_factory=list) + stats: Optional[dict] = None def load_packed_data_by_index(bin_idx: int, packing_context: PackingContext, logprobs_is_correction: bool): @@ -156,7 +157,6 @@ def log_packing_efficiency(packing_context: PackingContext): packing_efficiency = my_tokens / total_capacity if total_capacity > 0 else 0 avg_seq_length = total_tokens / len(packing_info.seq_lengths) rank = mpu.get_data_parallel_rank() - data_parallel_world_size = mpu.get_data_parallel_world_size() log_single_rank(logger, logging.INFO, f"[Sequence Packing] Statistics:") log_single_rank( @@ -189,98 +189,110 @@ def log_packing_efficiency(packing_context: PackingContext): ) # Add detailed per-rank sequence distribution analysis - if torch.distributed.is_initialized(): - # Gather sequence counts from all ranks - seq_counts_per_bin = [len(indices) for indices in my_bin_seq_indices] - non_empty_bins = [c for c in seq_counts_per_bin if c > 0] - - # Create tensor with rank statistics - rank_stats = torch.tensor( - [ - float(rank), - float(len(my_bin_seq_indices)), # total bins - float(len(non_empty_bins)), # non-empty bins - float(my_sequences), # total sequences - ( - float(min(non_empty_bins)) if non_empty_bins else 0.0 - ), # min sequences per bin - ( - float(max(non_empty_bins)) if non_empty_bins else 0.0 - ), # max sequences per bin - ( - float(my_sequences / len(non_empty_bins)) if non_empty_bins else 0.0 - ), # avg sequences per non-empty bin - ], - device='cuda', - ) - # Gather from all ranks - world_size = mpu.get_data_parallel_world_size() - all_rank_stats = [torch.zeros_like(rank_stats) for _ in range(world_size)] - torch.distributed.all_gather( - all_rank_stats, rank_stats, group=mpu.get_data_parallel_group() - ) + # Gather sequence counts from all ranks + seq_counts_per_bin = [len(indices) for indices in my_bin_seq_indices] + non_empty_bins = [c for c in seq_counts_per_bin if c > 0] + empty_bins_on_rank = len(my_bin_seq_indices) - len(non_empty_bins) + + # Create tensor with rank statistics + rank_stats = torch.tensor( + [ + float(rank), + float(len(my_bin_seq_indices)), # total bins + float(len(non_empty_bins)), # non-empty bins + float(my_sequences), # total sequences + ( + float(min(non_empty_bins)) if non_empty_bins else 0.0 + ), # min sequences per bin + ( + float(max(non_empty_bins)) if non_empty_bins else 0.0 + ), # max sequences per bin + ( + float(my_sequences / len(non_empty_bins)) if non_empty_bins else 0.0 + ), # avg sequences per non-empty bin + float(empty_bins_on_rank), # empty bins on each rank + ], + device='cuda', + ) - # Print detailed statistics for each rank - if rank == 0: - log_single_rank( - logger, - logging.INFO, - f"[Sequence Packing] Per-rank distribution ({packing_info.packing_algo} algorithm):", - ) - log_single_rank( - logger, - logging.INFO, - "[Sequence Packing] Rank | Total Bins | Non-empty | Sequences | Min/Bin | Max/Bin | Avg/Bin", - ) - log_single_rank( - logger, - logging.INFO, - "[Sequence Packing] -----|------------|-----------|-----------|---------|---------|--------", - ) - for stats in all_rank_stats: - r = int(stats[0].item()) - total_bins = int(stats[1].item()) - non_empty = int(stats[2].item()) - sequences = int(stats[3].item()) - min_seq = int(stats[4].item()) - max_seq = int(stats[5].item()) - avg_seq = stats[6].item() - log_single_rank( - logger, - logging.INFO, - f"[Sequence Packing] {r:3d} | {total_bins:10d} | {non_empty:9d} | {sequences:9d} | {min_seq:7d} | {max_seq:7d} | {avg_seq:6.1f}", - ) + # Gather from all ranks + world_size = mpu.get_data_parallel_world_size() + all_rank_stats = [torch.zeros_like(rank_stats) for _ in range(world_size)] + torch.distributed.all_gather( + all_rank_stats, rank_stats, group=mpu.get_data_parallel_group() + ) + all_rank_stats_tensor = torch.stack(all_rank_stats, dim=0) - # Also show first few bins for rank 0 as example + # Print detailed statistics for each rank + if rank == 0: + log_single_rank( + logger, + logging.INFO, + f"[Sequence Packing] Per-rank distribution ({packing_info.packing_algo} algorithm):", + ) + log_single_rank( + logger, + logging.INFO, + "[Sequence Packing] Rank | Total Bins | Non-empty | Sequences | Min/Bin | Max/Bin | Avg/Bin", + ) + log_single_rank( + logger, + logging.INFO, + "[Sequence Packing] -----|------------|-----------|-----------|---------|---------|--------", + ) + for stats in all_rank_stats: + r = int(stats[0].item()) + total_bins = int(stats[1].item()) + non_empty = int(stats[2].item()) + sequences = int(stats[3].item()) + min_seq = int(stats[4].item()) + max_seq = int(stats[5].item()) + avg_seq = stats[6].item() log_single_rank( logger, logging.INFO, - f"[Sequence Packing] Example (Rank 0 first 10 bins): {seq_counts_per_bin[:10]}", + f"[Sequence Packing] {r:3d} | {total_bins:10d} | {non_empty:9d} | {sequences:9d} | {min_seq:7d} | {max_seq:7d} | {avg_seq:6.1f}", ) - # Show the improvement from round-robin - total_seqs_all_ranks = sum(int(stats[3].item()) for stats in all_rank_stats) - avg_seqs_per_rank = total_seqs_all_ranks / world_size - max_deviation = max( - abs(int(stats[3].item()) - avg_seqs_per_rank) - for stats in all_rank_stats - ) - log_single_rank( - logger, - logging.INFO, - f"[Sequence Packing] Round-robin distribution quality:", - ) - log_single_rank( - logger, - logging.INFO, - f"[Sequence Packing] - Average sequences per rank: {avg_seqs_per_rank:.1f}", - ) - log_single_rank( - logger, - logging.INFO, - f"[Sequence Packing] - Max deviation from average: {max_deviation:.0f} sequences ({max_deviation/avg_seqs_per_rank*100:.1f}%)", - ) + # Also show first few bins for rank 0 as example + log_single_rank( + logger, + logging.INFO, + f"[Sequence Packing] Example (Rank 0 first 10 bins): {seq_counts_per_bin[:10]}", + ) + + # Show the improvement from round-robin + total_seqs_all_ranks = sum(int(stats[3].item()) for stats in all_rank_stats) + avg_seqs_per_rank = total_seqs_all_ranks / world_size + max_deviation = max( + abs(int(stats[3].item()) - avg_seqs_per_rank) + for stats in all_rank_stats + ) + log_single_rank( + logger, + logging.INFO, + f"[Sequence Packing] Round-robin distribution quality:", + ) + log_single_rank( + logger, + logging.INFO, + f"[Sequence Packing] - Average sequences per rank: {avg_seqs_per_rank:.1f}", + ) + log_single_rank( + logger, + logging.INFO, + f"[Sequence Packing] - Max deviation from average: {max_deviation:.0f} sequences ({max_deviation/avg_seqs_per_rank*100:.1f}%)", + ) + + result = { + "total_num_bins": int(torch.sum(all_rank_stats_tensor[:, 1]).item()), + "total_non_empty_bins": int(torch.sum(all_rank_stats_tensor[:, 2]).item()), + "total_empty_bins": int(torch.sum(all_rank_stats_tensor[:, 7]).item()), + "total_sequences": int(torch.sum(all_rank_stats_tensor[:, 3]).item()), + } + + return result def get_actual_sequence_lengths(sequences: torch.Tensor, pad_token: int) -> List[int]: """Get actual sequence lengths for pre-padded sequences. @@ -1058,7 +1070,8 @@ def pack_all_trajectories(trajs, generation_masks, inference_logprobs, global_ad cached_packed_seq_params=cached_packed_seq_params, ) - log_packing_efficiency(packing_context) + stats_aggregated_over_all_ranks = log_packing_efficiency(packing_context) + packing_context.stats = stats_aggregated_over_all_ranks return packing_context diff --git a/megatron/training/arguments.py b/megatron/training/arguments.py index 8018cf41174..f23ad9654bc 100644 --- a/megatron/training/arguments.py +++ b/megatron/training/arguments.py @@ -1996,6 +1996,8 @@ def _add_logging_args(parser): help='Path to save the wandb results locally.') group.add_argument('--logging-level', type=int, default=None, help='Set default logging level') + group.add_argument('--log-tokens-per-second', default=False, action="store_true", + help='Whether to log tokens per second.') return parser diff --git a/megatron/training/training.py b/megatron/training/training.py index 2365e909d0f..522521eeb00 100644 --- a/megatron/training/training.py +++ b/megatron/training/training.py @@ -1426,10 +1426,13 @@ def train_step(forward_step_func, data_iterator, model, optimizer, opt_param_sch if args.empty_unused_memory_level >= 2: torch.cuda.empty_cache() + num_empty_bins = 0 if mpu.is_pipeline_last_stage(ignore_virtual=True): # Average loss across microbatches. loss_reduced = {} + num_empty_bins = losses_reduced.pop() + for key in losses_reduced[0].keys(): val = [x[key].view(-1) for x in losses_reduced] if val[0].numel() == 2: @@ -1470,8 +1473,9 @@ def train_step(forward_step_func, data_iterator, model, optimizer, opt_param_sch grad_norm, num_zeros_in_grad, log_max_attention_logit, + num_empty_bins ) - return {}, skipped_iter, should_checkpoint, should_exit, exit_code, grad_norm, num_zeros_in_grad, log_max_attention_logit + return {}, skipped_iter, should_checkpoint, should_exit, exit_code, grad_norm, num_zeros_in_grad, log_max_attention_logit, num_empty_bins def training_log( @@ -1486,6 +1490,7 @@ def training_log( params_norm, num_zeros_in_grad, max_attention_logit, + num_empty_bins, ): """Log training information such as losses, timing, ....""" args = get_args() @@ -1757,6 +1762,8 @@ def training_log( total_loss_dict[skipped_iters_key] ) log_string += ' number of nan iterations: {:3d} |'.format(total_loss_dict[nan_iters_key]) + if args.log_tokens_per_second: + log_string += f' tokens per second: {((batch_size - num_empty_bins) * args.seq_length) / elapsed_time_per_iteration}' total_loss_dict[advanced_iters_key] = 0 total_loss_dict[skipped_iters_key] = 0 total_loss_dict[nan_iters_key] = 0 @@ -2425,6 +2432,7 @@ def get_e2e_base_metrics(): grad_norm, num_zeros_in_grad, max_attention_logit, + num_empty_bins ) = train_step( forward_step_func, train_data_iterator, model, optimizer, opt_param_scheduler, config, forward_backward_func ) @@ -2540,6 +2548,7 @@ def get_e2e_base_metrics(): params_norm, num_zeros_in_grad, max_attention_logit, + num_empty_bins ) # Evaluation. @@ -2721,6 +2730,9 @@ def evaluate( torch.cuda.empty_cache() if mpu.is_pipeline_last_stage(ignore_virtual=True): + + _ = loss_dicts.pop() + # Reduce across processes. for key in loss_dicts[0].keys(): if key not in total_loss_dict: diff --git a/pretrain_gpt.py b/pretrain_gpt.py index 142261e7eee..cbd02ee5ced 100644 --- a/pretrain_gpt.py +++ b/pretrain_gpt.py @@ -130,6 +130,7 @@ def forward_step(data_iterator, model: GPTModel, return_schedule_plan: bool = Fa """ args = get_args() timers = get_timers() + num_empty_bins = 0 # Number of padding bins from the data loader # Get the batch. timers('batch-generator', log_level=2).start() @@ -156,7 +157,7 @@ def forward_step(data_iterator, model: GPTModel, return_schedule_plan: bool = Fa ) # [ModelOpt]: model is needed to access ModelOpt distillation losses - return output_tensor, partial(loss_func, loss_mask, model=model) + return output_tensor, partial(loss_func, loss_mask, model=model), num_empty_bins def is_dataset_built_on_rank(vp_stage=None): diff --git a/tests/unit_tests/export/trtllm/test_distributed_fp8.py b/tests/unit_tests/export/trtllm/test_distributed_fp8.py index cf47a864108..22bf9c977c8 100644 --- a/tests/unit_tests/export/trtllm/test_distributed_fp8.py +++ b/tests/unit_tests/export/trtllm/test_distributed_fp8.py @@ -1,3 +1,5 @@ +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + from functools import partial import pytest @@ -102,7 +104,7 @@ def loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor): loss_mask = data['loss_mask'].to(DEVICE) output_tensor = model(tokens, position_ids, attention_mask, labels=labels) - return output_tensor, partial(loss_func, loss_mask) + return output_tensor, partial(loss_func, loss_mask), None class TestTRTLLMSingleDeviceConverterFP8: diff --git a/tests/unit_tests/export/trtllm/test_single_device_fp8.py b/tests/unit_tests/export/trtllm/test_single_device_fp8.py index 04bbfdb127d..6e75a79d8bf 100644 --- a/tests/unit_tests/export/trtllm/test_single_device_fp8.py +++ b/tests/unit_tests/export/trtllm/test_single_device_fp8.py @@ -1,3 +1,5 @@ +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + from functools import partial import pytest @@ -99,7 +101,7 @@ def _loss_func(loss_mask: torch.Tensor, output_tensor: torch.Tensor): loss_mask = data['loss_mask'].to(DEVICE) output_tensor = model(tokens, position_ids, attention_mask, labels=labels) - return output_tensor, partial(_loss_func, loss_mask) + return output_tensor, partial(_loss_func, loss_mask), None class TestTRTLLMSingleDeviceConverterFP8: diff --git a/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py b/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py index faf83837a23..1a20dbf387c 100644 --- a/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py +++ b/tests/unit_tests/pipeline_parallel/test_pipeline_layout.py @@ -224,7 +224,7 @@ def loss_func(output_tensor: torch.Tensor): loss = output_tensor.sum() return output_tensor, loss - return output_tensor, loss_func + return output_tensor, loss_func, None iteration = 123 layer_spec_fn = get_gpt_decoder_block_spec if is_moe else gpt_te_spec @@ -253,6 +253,7 @@ def loss_func(output_tensor: torch.Tensor): micro_batch_size=1, forward_only=True, ) + losses_reduced.pop() # Empty bins not required for this test optimizer = None opt_param_scheduler = None @@ -293,6 +294,8 @@ def loss_func(output_tensor: torch.Tensor): forward_only=True, ) + losses_reduced_baseline.pop() # Empty bins not required for this test + if parallel_state.is_pipeline_last_stage(ignore_virtual=True): for loss, loss_baseline in zip(losses_reduced, losses_reduced_baseline): assert torch.equal(loss, loss_baseline) diff --git a/tests/unit_tests/pipeline_parallel/test_schedules.py b/tests/unit_tests/pipeline_parallel/test_schedules.py index b861aa2df49..846af3906b5 100644 --- a/tests/unit_tests/pipeline_parallel/test_schedules.py +++ b/tests/unit_tests/pipeline_parallel/test_schedules.py @@ -144,7 +144,7 @@ def forward_step_func(data_iterator, model): def loss_func(output_tensor): return rank, {'loss_reduced': rank} - return model(dummy_data), loss_func + return model(dummy_data), loss_func, None model = torch.nn.Linear(4, 1) model.model_type = 'unit-test' @@ -171,6 +171,8 @@ def set_input_tensor(input_tensor): forward_only=True, ) + losses_reduced.pop() # Empty bins is not used for this test + loss_reduced_expected = [ {'loss_reduced': rank}, {'loss_reduced': rank}, diff --git a/tests/unit_tests/transformer/test_full_cuda_graph.py b/tests/unit_tests/transformer/test_full_cuda_graph.py index 312ae467304..1d92c6edee0 100644 --- a/tests/unit_tests/transformer/test_full_cuda_graph.py +++ b/tests/unit_tests/transformer/test_full_cuda_graph.py @@ -37,7 +37,7 @@ def forward_step_func(data_iterator, model): def loss_func(output_tensor): return rank, {'loss_reduced': rank} - return model(dummy_data), loss_func + return model(dummy_data), loss_func, None model = torch.nn.Linear(4, 1) diff --git a/train_rl.py b/train_rl.py index d767e30401b..1f99ec50eb3 100644 --- a/train_rl.py +++ b/train_rl.py @@ -301,6 +301,11 @@ def forward_step(data_iterator, model: GPTModel, loss_only: bool = False): ) output_tensor = loss + if runtime_state.packing_context: + num_empty_bins = runtime_state.packing_context.stats['total_empty_bins'] + else: + num_empty_bins = None + # loss_mask will not be applied to 0th token as we do not have a logprob for it. return output_tensor, partial( loss_func, @@ -310,7 +315,7 @@ def forward_step(data_iterator, model: GPTModel, loss_only: bool = False): entropy_term, truncated_from_above, truncated_from_below, - ) + ), num_empty_bins def train_valid_test_datasets_provider(train_val_test_num_samples):