From 621b1037e36c519c8f5b109158777e1924e789b6 Mon Sep 17 00:00:00 2001 From: Kashif Rasul Date: Thu, 19 Jun 2025 17:32:29 +0000 Subject: [PATCH 1/8] intial --- pipelinerl/run_finetune.py | 68 +++++++++++--------------------------- pyproject.toml | 2 +- 2 files changed, 21 insertions(+), 49 deletions(-) diff --git a/pipelinerl/run_finetune.py b/pipelinerl/run_finetune.py index 30e340c4..bb298ad4 100644 --- a/pipelinerl/run_finetune.py +++ b/pipelinerl/run_finetune.py @@ -712,45 +712,9 @@ def batch_generator_fn(): dist.all_gather(all_samples, local_samples) total_samples = sum(int(tensor.item()) for tensor in all_samples) do_optimizer_step = total_samples == target_samples - using_deepspeed = isinstance(model, deepspeed.DeepSpeedEngine) - - def backward(loss, is_final_micro_batch=False): - """Perform backward pass with appropriate gradient accumulation boundary""" - if using_deepspeed: - # Tell DeepSpeed whether this is a boundary for gradient accumulation - model.set_gradient_accumulation_boundary(is_final_micro_batch) - # DeepSpeed's backward - model.backward(loss) - else: - # accelerator's backward - get_accelerator().backward(loss) - - def optimizer_step_and_zero_grad(): - """Perform optimizer step and zero gradients""" - if using_deepspeed: - # Final boundary before optimizer step - model.set_gradient_accumulation_boundary(True) - model.step() - grad_norm = model.get_global_grad_norm() if hasattr(model, "get_global_grad_norm") else None - if isinstance(training_metrics.grad_norm, torch.Tensor): - grad_norm = grad_norm.item() - training_metrics.grad_norm = grad_norm if grad_norm is not None else -1.0 - else: - max_grad_norm = args.get("gradient_clipping_threshold", None) - training_metrics.grad_norm = get_accelerator().clip_grad_norm_(model.parameters(), max_grad_norm) - optimizer.step() - optimizer.zero_grad() - - @contextlib.contextmanager - def toggle_sync(sync: bool): - """Wrap accelerate.no_sync() if sync is False.""" - if sync: - yield # do not enforce no_sync mode - else: - with get_accelerator().no_sync(model): - yield - with toggle_sync(do_optimizer_step): + # Use Accelerate's accumulate context manager to handle gradient accumulation automatically + with get_accelerator().accumulate(model): # Choose RL step function based on seq_packing config loss, this_step_rl_metrics = rl_step( model, batch, training_metrics.completed_steps, final_train_steps, rl_config @@ -765,7 +729,21 @@ def toggle_sync(sync: bool): training_metrics.lr = optimizer.param_groups[0]["lr"] - backward(loss, is_final_micro_batch=do_optimizer_step) + # Use accelerator's unified backward method + get_accelerator().backward(loss) + + # Only perform optimizer step when sync_gradients is True (handled automatically by accumulate) + if get_accelerator().sync_gradients: + # Clip gradients using accelerator's unified method (handles DeepSpeed/FSDP automatically) + max_grad_norm = args.get("gradient_clipping_threshold", None) + if max_grad_norm is not None: + grad_norm = get_accelerator().clip_grad_norm_(model.parameters(), max_grad_norm) + training_metrics.grad_norm = grad_norm.item() if isinstance(grad_norm, torch.Tensor) else grad_norm + else: + training_metrics.grad_norm = -1.0 + + optimizer.step() + optimizer.zero_grad() if not is_sentinel_batch: passes_took.append(time.time() - time_before_pass) @@ -786,16 +764,10 @@ def toggle_sync(sync: bool): training_metrics.samples = start_samples + total_samples this_worker_tokens = sum(tokens_processed) training_metrics.tokens += this_worker_tokens * get_accelerator().state.num_processes - try: - # Synchronize workers before optimizer step - logger.info("Waiting for all workers to synchronize...") - torch.cuda.synchronize() # Ensure CUDA operations are complete - get_accelerator().wait_for_everyone() - logger.info("All workers synchronized successfully") - except Exception as e: - logger.warning(f"Synchronization error: {e}. Continuing anyway...") + + # Wait for everyone using accelerator's method + get_accelerator().wait_for_everyone() - optimizer_step_and_zero_grad() lr_scheduler.step() metrics_dict = {} diff --git a/pyproject.toml b/pyproject.toml index 7da87c55..2245d939 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,7 +15,7 @@ authors = [ dependencies = [ "torch>=2.6", "vllm==0.8.3", - "accelerate==1.7.0", + "accelerate==1.8.0", "Tapeagents[finetune]==0.1.15", "transformers==4.51.0", "flash-attn==2.7.4.post1", From 95f1e78bbdabed8cfb6ca57e1eb349c6ec93cf3f Mon Sep 17 00:00:00 2001 From: Kashif Rasul Date: Sun, 22 Jun 2025 18:49:38 +0000 Subject: [PATCH 2/8] undo --- pipelinerl/run_finetune.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pipelinerl/run_finetune.py b/pipelinerl/run_finetune.py index bb298ad4..16650dde 100644 --- a/pipelinerl/run_finetune.py +++ b/pipelinerl/run_finetune.py @@ -765,8 +765,14 @@ def batch_generator_fn(): this_worker_tokens = sum(tokens_processed) training_metrics.tokens += this_worker_tokens * get_accelerator().state.num_processes - # Wait for everyone using accelerator's method - get_accelerator().wait_for_everyone() + try: + # Synchronize workers before optimizer step + logger.info("Waiting for all workers to synchronize...") + torch.cuda.synchronize() # Ensure CUDA operations are complete + get_accelerator().wait_for_everyone() + logger.info("All workers synchronized successfully") + except Exception as e: + logger.warning(f"Synchronization error: {e}. Continuing anyway...") lr_scheduler.step() From e35503bd0dec65803275b0293392a295cafca8ff Mon Sep 17 00:00:00 2001 From: Kashif Rasul Date: Sun, 22 Jun 2025 18:50:41 +0000 Subject: [PATCH 3/8] formnattng --- pipelinerl/run_finetune.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pipelinerl/run_finetune.py b/pipelinerl/run_finetune.py index 16650dde..c2d268a8 100644 --- a/pipelinerl/run_finetune.py +++ b/pipelinerl/run_finetune.py @@ -764,7 +764,6 @@ def batch_generator_fn(): training_metrics.samples = start_samples + total_samples this_worker_tokens = sum(tokens_processed) training_metrics.tokens += this_worker_tokens * get_accelerator().state.num_processes - try: # Synchronize workers before optimizer step logger.info("Waiting for all workers to synchronize...") From 2642895e51ff312341a4390d20da0c7935761a3c Mon Sep 17 00:00:00 2001 From: Kashif Rasul Date: Sun, 22 Jun 2025 19:07:44 +0000 Subject: [PATCH 4/8] fix comments --- pipelinerl/run_finetune.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipelinerl/run_finetune.py b/pipelinerl/run_finetune.py index c2d268a8..bd44fea0 100644 --- a/pipelinerl/run_finetune.py +++ b/pipelinerl/run_finetune.py @@ -713,7 +713,7 @@ def batch_generator_fn(): total_samples = sum(int(tensor.item()) for tensor in all_samples) do_optimizer_step = total_samples == target_samples - # Use Accelerate's accumulate context manager to handle gradient accumulation automatically + # Perform backward pass with appropriate gradient accumulation boundary with get_accelerator().accumulate(model): # Choose RL step function based on seq_packing config loss, this_step_rl_metrics = rl_step( @@ -732,9 +732,9 @@ def batch_generator_fn(): # Use accelerator's unified backward method get_accelerator().backward(loss) - # Only perform optimizer step when sync_gradients is True (handled automatically by accumulate) + # Only perform optimizer step when sync_gradients is True if get_accelerator().sync_gradients: - # Clip gradients using accelerator's unified method (handles DeepSpeed/FSDP automatically) + # Clip gradients max_grad_norm = args.get("gradient_clipping_threshold", None) if max_grad_norm is not None: grad_norm = get_accelerator().clip_grad_norm_(model.parameters(), max_grad_norm) From f709530577db637b47021c112e205861e228c3c4 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Wed, 25 Jun 2025 17:32:31 +0000 Subject: [PATCH 5/8] rm peft --- pipelinerl/finetune/checkpoints.py | 39 +------ pipelinerl/finetune/lora.py | 165 ----------------------------- pipelinerl/finetune/optim.py | 3 +- 3 files changed, 3 insertions(+), 204 deletions(-) delete mode 100644 pipelinerl/finetune/lora.py diff --git a/pipelinerl/finetune/checkpoints.py b/pipelinerl/finetune/checkpoints.py index d07a0e55..be448299 100644 --- a/pipelinerl/finetune/checkpoints.py +++ b/pipelinerl/finetune/checkpoints.py @@ -19,7 +19,6 @@ from transformers.models.auto.modeling_auto import _BaseAutoModelClass from .context import get_accelerator, logger -from .lora import has_lora_checkpoint, lora_load, lora_save, prepare_lora_model from .types import ModelClass, TrainingMetrics @@ -101,23 +100,6 @@ def load_model(args, model_class, current_dir): if args.load_as_bf16: loading_args["torch_dtype"] = torch.bfloat16 - if args.lora.enabled: - if is_ds_zero_3: - raise Exception("LoRA is not compatible with Deepspeed zero stage 3") - if args.lora.base_model_8bit: - loading_args["quantization_config"] = BitsAndBytesConfig( - load_in_4bit=False, - load_in_8bit=True, - llm_int8_has_fp16_weight=args.load_as_bf16, - ) - elif args.lora.base_model_4bit: - loading_args["quantization_config"] = BitsAndBytesConfig( - load_in_4bit=True, - load_in_8bit=False, - bnb_4bit_quant_type="nf4", - bnb_4bit_use_double_quant=False, - bnb_4bit_compute_dtype=torch.bfloat16, - ) if args.auto_device_map: loading_args["device_map"] = "auto" model_cls = get_auto_model_class(model_class) @@ -130,8 +112,7 @@ def load_model(args, model_class, current_dir): # Size mismatch errors here may be due to improper used of Deepspeed+save_pretrained() # instead, always call save_model_only() in all processes - # when LoRA enabled, always preload the original model, the lora weights will be loaded later - model_to_load = args.config_name if args.lora.enabled else str(current_dir) + model_to_load = args.config_name logger.info(f"Loading model {model_cls} weights from {current_dir}") else: # from scratch logger.info(f"Initializing model {model_cls} from {args.config_name}") @@ -139,15 +120,6 @@ def load_model(args, model_class, current_dir): logger.info(f"Loading args: {loading_args}") model = model_cls.from_pretrained(model_to_load, **loading_args) - if args.lora.enabled: - model = prepare_lora_model(args.lora, model, args.gradient_checkpointing) - if has_lora_checkpoint(current_dir): - lora_load(current_dir, model) - elif args.gradient_checkpointing: - model.gradient_checkpointing_enable( - gradient_checkpointing_kwargs={"use_reentrant": args.reentrant_checkpointing} - ) - get_accelerator().wait_for_everyone() return model @@ -300,7 +272,6 @@ def save_model_and_tokenizer( output_dir: Path, model: transformers.PreTrainedModel, tokenizer: transformers.PreTrainedTokenizer | transformers.PreTrainedTokenizerFast, - lora: bool = False, safe_serialization: bool = False, ): logger.info("Saving model and tokenizer") @@ -309,7 +280,6 @@ def save_model_and_tokenizer( temp_dir, model, unwrap=True, - lora=lora, safe_serialization=safe_serialization, ) save_tokenizer_only(temp_dir, tokenizer) @@ -319,7 +289,6 @@ def save_model_only( output_dir: Path, model, unwrap: bool = True, - lora: bool = False, safe_serialization: bool = False, ): """ @@ -344,12 +313,8 @@ def save_model_only( logger.info(f"Save model to {output_dir}") unwrapped_model = get_accelerator().unwrap_model(model) if unwrap else model - if lora: - lora_save(output_dir, unwrapped_model) - return - # for non-deepspeed models - elif isinstance(unwrapped_model, transformers.PreTrainedModel): + if isinstance(unwrapped_model, transformers.PreTrainedModel): logger.info("Saving model using transformers save_pretrained") unwrapped_model.save_pretrained( # type: ignore output_dir, diff --git a/pipelinerl/finetune/lora.py b/pipelinerl/finetune/lora.py deleted file mode 100644 index 81011be0..00000000 --- a/pipelinerl/finetune/lora.py +++ /dev/null @@ -1,165 +0,0 @@ -import json -import logging -import os -import sys -from pathlib import Path - -import torch -from peft.mapping import get_peft_model -from peft.peft_model import PeftModel -from peft.tuners.lora import LoraConfig -from peft.utils.other import prepare_model_for_kbit_training -from peft.utils.save_and_load import set_peft_model_state_dict -from safetensors.torch import load_file -from transformers import AutoModelForCausalLM, AutoTokenizer - -logger = logging.getLogger(__name__) - - -def has_lora_checkpoint(current_dir: Path) -> bool: - return os.path.exists(current_dir / "adapter_model.bin") or os.path.exists( - current_dir / "adapter_model.safetensors" - ) - - -def prepare_model_for_bf16_training(model, use_gradient_checkpointing=False, gradient_checkpointing_kwargs=None): - logger.info("Prepare LoRA for BF16 training") - for _, param in model.named_parameters(): - # freeze base model's layers - param.requires_grad = False - - for name, param in model.named_parameters(): - # upcast LM head and layernorms - if any([k in name for k in ["lm_head", "wte", "ln_"]]): - param.data = param.data.to(torch.float32) - - if use_gradient_checkpointing: - # For backward compatibility - if hasattr(model, "enable_input_require_grads"): - model.enable_input_require_grads() - else: - - def make_inputs_require_grad(module, input, output): - output.requires_grad_(True) - - model.get_input_embeddings().register_forward_hook(make_inputs_require_grad) - model.gradient_checkpointing_enable() - - return model - - -def prepare_lora_model(lora_config, model, gradient_checkpointing) -> PeftModel: - logger.info("Prepare LoRA adapters") - lora_prepare_fn = ( - prepare_model_for_kbit_training - if lora_config.base_model_8bit or lora_config.base_model_4bit - else prepare_model_for_bf16_training - ) - all_params = model.num_parameters() - model = lora_prepare_fn( - model, use_gradient_checkpointing=gradient_checkpointing, gradient_checkpointing_kwargs={"use_reentrant": True} - ) - lora_config = LoraConfig( - inference_mode=False, - r=lora_config.r, - lora_alpha=lora_config.alpha, - lora_dropout=lora_config.dropout, - bias=lora_config.bias, - task_type=lora_config.task_type, - target_modules=list(lora_config.target_modules), - ) - model = get_peft_model(model, lora_config) - trainable_params = model.num_parameters(only_trainable=True) - logger.info( - f"LoRA trainable params: {trainable_params:_} of {all_params:_} || trainable%: {100 * trainable_params / all_params:.2f}" - ) - return model # type: ignore - - -def is_lora_checkpoint(lora_model_path): - lora_model_config = os.path.join(lora_model_path, "adapter_config.json") - return os.path.exists(lora_model_config) - - -def get_base_model_name(lora_model_path): - assert is_lora_checkpoint(lora_model_path) - lora_model_config = os.path.join(lora_model_path, "adapter_config.json") - with open(lora_model_config) as f: - lora_config = json.load(f) - return lora_config["base_model_name_or_path"] - - -def lora_load_and_merge(lora_model_path, **kwargs): - base_model_name_or_path = get_base_model_name(lora_model_path) - logger.info(f"Load base model {base_model_name_or_path}") - base_model = AutoModelForCausalLM.from_pretrained(base_model_name_or_path, **kwargs) - model = PeftModel.from_pretrained(base_model, lora_model_path) - logger.info(f"Merge adapters from {lora_model_path}") - return model.merge_and_unload() # type: ignore - - -def lora_save(checkpoint_folder, model): - model.save_pretrained(checkpoint_folder) - logger.info(f"Saved LoRA model to {checkpoint_folder}") - - -def lora_load(checkpoint_folder, model): - adapter_path = os.path.join(checkpoint_folder, "adapter_model.safetensors") - if os.path.exists(adapter_path): - adapters_weights = load_file(adapter_path) - else: - adapter_path = os.path.join(checkpoint_folder, "adapter_model.bin") - adapters_weights = torch.load(adapter_path) - set_peft_model_state_dict(model, adapters_weights) - logger.info(f"Loaded LoRA model from {checkpoint_folder}.") - return model - - -def apply_lora(model, lora_model_path): - lora_model_config = os.path.join(lora_model_path, "adapter_config.json") - assert os.path.exists(lora_model_config) - with open(lora_model_config) as f: - lora_config = json.load(f) - scaling = lora_config["lora_alpha"] / lora_config["r"] - lora_model_file = os.path.join(lora_model_path, "adapter_model.bin") - assert os.path.exists(lora_model_file) - lora_weights = torch.load(lora_model_file) - for layer_name, layer_weights in model.named_parameters(): - lora_a = None - lora_b = None - for k, v in lora_weights.items(): - if layer_name in k: - if "lora_A" in k: - lora_a = v - elif "lora_B" in k: - lora_b = v - if lora_a is not None and lora_b is not None: - wdiff = (lora_b @ lora_a) * scaling - layer_weights.data += wdiff - break - - -def merge_lora(lora_model_path): - if lora_model_path[-1] == "/": - lora_model_path = lora_model_path[:-1] - assert os.path.isdir(lora_model_path), f"{lora_model_path} is not a dir" - lora_model_config = os.path.join(lora_model_path, "adapter_config.json") - assert os.path.exists(lora_model_config), f"{lora_model_config} does not exists" - - logger.info(f"Merge lora checkpoint {lora_model_path}") - model = lora_load_and_merge(lora_model_path, torch_dtype=torch.bfloat16, low_cpu_mem_usage=True) - tokenizer = AutoTokenizer.from_pretrained(lora_model_path) - - tmp_dir = f"{lora_model_path}_merged" - logger.info(f"Save merged model to {tmp_dir}") - model.save_pretrained(tmp_dir, safe_serialization=True) - tokenizer.save_pretrained(tmp_dir) - - os.rename(lora_model_path, f"{lora_model_path}_lora") - os.rename(tmp_dir, lora_model_path) - logger.info(f"Merged model saved to {lora_model_path}") - - -if __name__ == "__main__": - assert len(sys.argv) == 2, "Merging lora weights: python lora.py " - merge_lora(sys.argv[1]) diff --git a/pipelinerl/finetune/optim.py b/pipelinerl/finetune/optim.py index 268e88cf..bf636f4b 100644 --- a/pipelinerl/finetune/optim.py +++ b/pipelinerl/finetune/optim.py @@ -1,12 +1,11 @@ import torch -from peft.peft_model import PeftModel from torch.optim.adamw import AdamW from torch.optim.optimizer import Optimizer from transformers import Adafactor, PreTrainedModel def get_grouped_params( - model: PreTrainedModel | PeftModel, + model: PreTrainedModel, weight_decay: float, no_decay: list[str] = ["bias", "LayerNorm.weight"], ): From cf8c23d8b7110ba307f019e48eb8a8b80f8e010d Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Wed, 25 Jun 2025 18:21:57 +0000 Subject: [PATCH 6/8] empty cuda cache --- pipelinerl/finetune_loop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index bd44fea0..3b79c6b3 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -744,6 +744,7 @@ def batch_generator_fn(): optimizer.step() optimizer.zero_grad() + torch.cuda.empty_cache() if not is_sentinel_batch: passes_took.append(time.time() - time_before_pass) From 62845a5a797220db7b7846323ec87a1f9ce75dd4 Mon Sep 17 00:00:00 2001 From: Alex Piche Date: Wed, 25 Jun 2025 19:25:22 +0000 Subject: [PATCH 7/8] upd pyproject.toml --- pyproject.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 2245d939..59251b76 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -15,14 +15,16 @@ authors = [ dependencies = [ "torch>=2.6", "vllm==0.8.3", + "Tapeagents==0.1.15", + "deepspeed~=0.15.4", "accelerate==1.8.0", - "Tapeagents[finetune]==0.1.15", "transformers==4.51.0", "flash-attn==2.7.4.post1", "math-verify[antlr4_9_3]==0.7.0", "orjson==3.10.16", "redis==5.2.1", "hydra-core>=1.3.2", + "wandb~=0.19", ] [tool.setuptools.packages.find] From 2da0e3b0e7c9e061cc71ceefe728a47cf0ccc60b Mon Sep 17 00:00:00 2001 From: Kashif Rasul Date: Wed, 25 Jun 2025 19:57:51 +0000 Subject: [PATCH 8/8] Optimizer steps should be outside accelerate.accumulate(model) --- pipelinerl/finetune_loop.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/pipelinerl/finetune_loop.py b/pipelinerl/finetune_loop.py index 3b79c6b3..97780e22 100644 --- a/pipelinerl/finetune_loop.py +++ b/pipelinerl/finetune_loop.py @@ -731,20 +731,20 @@ def batch_generator_fn(): # Use accelerator's unified backward method get_accelerator().backward(loss) - - # Only perform optimizer step when sync_gradients is True - if get_accelerator().sync_gradients: - # Clip gradients - max_grad_norm = args.get("gradient_clipping_threshold", None) - if max_grad_norm is not None: - grad_norm = get_accelerator().clip_grad_norm_(model.parameters(), max_grad_norm) - training_metrics.grad_norm = grad_norm.item() if isinstance(grad_norm, torch.Tensor) else grad_norm - else: - training_metrics.grad_norm = -1.0 - - optimizer.step() - optimizer.zero_grad() - torch.cuda.empty_cache() + + # Only perform optimizer step when sync_gradients is True + if get_accelerator().sync_gradients: + # Clip gradients + max_grad_norm = args.get("gradient_clipping_threshold", None) + if max_grad_norm is not None: + grad_norm = get_accelerator().clip_grad_norm_(model.parameters(), max_grad_norm) + training_metrics.grad_norm = grad_norm.item() if isinstance(grad_norm, torch.Tensor) else grad_norm + else: + training_metrics.grad_norm = -1.0 + + optimizer.step() + optimizer.zero_grad() + torch.cuda.empty_cache() if not is_sentinel_batch: passes_took.append(time.time() - time_before_pass)