From d405a80a08437436c3be97266eb189ee46db0944 Mon Sep 17 00:00:00 2001 From: Mamta Singh Date: Mon, 23 Jun 2025 13:24:27 +0000 Subject: [PATCH 1/3] update branch Signed-off-by: Mamta Singh --- QEfficient/cloud/finetune.py | 45 +++++--- QEfficient/finetune/configs/training.py | 4 + QEfficient/finetune/dataset/custom_dataset.py | 19 ++-- .../finetune/dataset/grammar_dataset.py | 12 +- QEfficient/finetune/eval.py | 12 +- QEfficient/finetune/utils/config_utils.py | 4 +- QEfficient/finetune/utils/dataset_utils.py | 3 +- QEfficient/finetune/utils/logging_utils.py | 57 ++++++++++ QEfficient/finetune/utils/plot_metrics.py | 6 +- QEfficient/finetune/utils/train_utils.py | 105 ++++++++---------- 10 files changed, 165 insertions(+), 102 deletions(-) create mode 100644 QEfficient/finetune/utils/logging_utils.py diff --git a/QEfficient/cloud/finetune.py b/QEfficient/cloud/finetune.py index 1e0dc48bc..e0563d11b 100644 --- a/QEfficient/cloud/finetune.py +++ b/QEfficient/cloud/finetune.py @@ -17,7 +17,7 @@ import torch.utils.data from peft import PeftModel, get_peft_model from torch.optim.lr_scheduler import StepLR -from transformers import AutoModel, AutoModelForCausalLM, AutoTokenizer +from transformers import AutoModel, AutoModelForCausalLM, AutoModelForSequenceClassification, AutoTokenizer from QEfficient.finetune.configs.training import TrainConfig from QEfficient.finetune.utils.config_utils import ( @@ -26,18 +26,22 @@ update_config, ) from QEfficient.finetune.utils.dataset_utils import get_dataloader +from QEfficient.finetune.utils.logging_utils import logger from QEfficient.finetune.utils.parser import get_finetune_parser -from QEfficient.finetune.utils.train_utils import get_longest_seq_length, print_model_size, train -from QEfficient.utils._utils import login_and_download_hf_lm +from QEfficient.finetune.utils.train_utils import ( + get_longest_seq_length, + print_model_size, + print_trainable_parameters, + train, +) +from QEfficient.utils._utils import hf_download # Try importing QAIC-specific module, proceed without it if unavailable try: import torch_qaic # noqa: F401 except ImportError as e: - print(f"Warning: {e}. Proceeding without QAIC modules.") - + logger.log_rank_zero(f"{e}. Moving ahead without these qaic modules.") -from transformers import AutoModelForSequenceClassification # Suppress all warnings warnings.filterwarnings("ignore") @@ -106,7 +110,8 @@ def load_model_and_tokenizer( - Resizes model embeddings if tokenizer vocab size exceeds model embedding size. - Sets pad_token_id to eos_token_id if not defined in the tokenizer. """ - pretrained_model_path = login_and_download_hf_lm(train_config.model_name) + logger.log_rank_zero(f"Loading HuggingFace model for {train_config.model_name}") + pretrained_model_path = hf_download(train_config.model_name) if train_config.task_type == "seq_classification": model = AutoModelForSequenceClassification.from_pretrained( pretrained_model_path, @@ -116,7 +121,7 @@ def load_model_and_tokenizer( ) if not hasattr(model, "base_model_prefix"): - raise RuntimeError("Given huggingface model does not have 'base_model_prefix' attribute.") + logger.raise_runtimeerror("Given huggingface model does not have 'base_model_prefix' attribute.") for param in getattr(model, model.base_model_prefix).parameters(): param.requires_grad = False @@ -141,11 +146,10 @@ def load_model_and_tokenizer( # If there is a mismatch between tokenizer vocab size and embedding matrix, # throw a warning and then expand the embedding matrix if len(tokenizer) > model.get_input_embeddings().weight.shape[0]: - print("WARNING: Resizing embedding matrix to match tokenizer vocab size.") + logger.log_rank_zero("Resizing the embedding matrix to match the tokenizer vocab size.", logger.WARNING) model.resize_token_embeddings(len(tokenizer)) - # FIXME (Meet): Cover below line inside the logger once it is implemented. - print_model_size(model, train_config) + print_model_size(model) # Note: Need to call this before calling PeftModel.from_pretrained or get_peft_model. # Because, both makes model.is_gradient_checkpointing = True which is used in peft library to @@ -157,7 +161,9 @@ def load_model_and_tokenizer( if hasattr(model, "supports_gradient_checkpointing") and model.supports_gradient_checkpointing: model.gradient_checkpointing_enable(gradient_checkpointing_kwargs={"preserve_rng_state": False}) else: - raise RuntimeError("Given model doesn't support gradient checkpointing. Please disable it and run it.") + logger.raise_runtimeerror( + "Given model doesn't support gradient checkpointing. Please disable it and run it." + ) model = apply_peft(model, train_config, peft_config_file, **kwargs) @@ -192,7 +198,7 @@ def apply_peft( else: peft_config = generate_peft_config(train_config, peft_config_file, **kwargs) model = get_peft_model(model, peft_config) - model.print_trainable_parameters() + print_trainable_parameters(model) return model @@ -217,7 +223,7 @@ def setup_dataloaders( - Length of longest sequence in the dataset. Raises: - ValueError: If validation is enabled but the validation set is too small. + RuntimeError: If validation is enabled but the validation set is too small. Notes: - Applies a custom data collator if provided by get_custom_data_collator. @@ -225,17 +231,17 @@ def setup_dataloaders( """ train_dataloader = get_dataloader(tokenizer, dataset_config, train_config, split="train") - print(f"--> Num of Training Set Batches loaded = {len(train_dataloader)}") + logger.log_rank_zero(f"Number of Training Set Batches loaded = {len(train_dataloader)}") eval_dataloader = None if train_config.run_validation: eval_dataloader = get_dataloader(tokenizer, dataset_config, train_config, split="val") if len(eval_dataloader) == 0: - raise ValueError( + logger.raise_runtimeerror( f"The eval set size is too small for dataloader to load even one batch. Please increase the size of eval set. ({len(eval_dataloader)=})" ) else: - print(f"--> Num of Validation Set Batches loaded = {len(eval_dataloader)}") + logger.log_rank_zero(f"Number of Validation Set Batches loaded = {len(eval_dataloader)}") longest_seq_length, _ = get_longest_seq_length( torch.utils.data.ConcatDataset([train_dataloader.dataset, eval_dataloader.dataset]) @@ -274,13 +280,16 @@ def main(peft_config_file: str = None, **kwargs) -> None: dataset_config = generate_dataset_config(train_config.dataset) update_config(dataset_config, **kwargs) + logger.prepare_dump_logs(train_config.dump_logs) + logger.setLevel(train_config.log_level) + setup_distributed_training(train_config) setup_seeds(train_config.seed) model, tokenizer = load_model_and_tokenizer(train_config, dataset_config, peft_config_file, **kwargs) # Create DataLoaders for the training and validation dataset train_dataloader, eval_dataloader, longest_seq_length = setup_dataloaders(train_config, dataset_config, tokenizer) - print( + logger.log_rank_zero( f"The longest sequence length in the train data is {longest_seq_length}, " f"passed context length is {train_config.context_length} and overall model's context length is " f"{model.config.max_position_embeddings}" diff --git a/QEfficient/finetune/configs/training.py b/QEfficient/finetune/configs/training.py index deac537bc..272819120 100644 --- a/QEfficient/finetune/configs/training.py +++ b/QEfficient/finetune/configs/training.py @@ -5,6 +5,7 @@ # # ----------------------------------------------------------------------------- +import logging from dataclasses import dataclass @@ -96,3 +97,6 @@ class TrainConfig: dump_root_dir: str = "mismatches/step_" opByOpVerifier: bool = False + + dump_logs: bool = True + log_level: str = logging.INFO diff --git a/QEfficient/finetune/dataset/custom_dataset.py b/QEfficient/finetune/dataset/custom_dataset.py index 6d9baf90d..757f32e01 100644 --- a/QEfficient/finetune/dataset/custom_dataset.py +++ b/QEfficient/finetune/dataset/custom_dataset.py @@ -8,6 +8,8 @@ import importlib from pathlib import Path +from QEfficient.finetune.utils.logging_utils import logger + def load_module_from_py_file(py_file: str) -> object: """ @@ -30,20 +32,19 @@ def get_custom_dataset(dataset_config, tokenizer, split: str, context_length=Non module_path, func_name = dataset_config.file, "get_custom_dataset" if not module_path.endswith(".py"): - raise ValueError(f"Dataset file {module_path} is not a .py file.") + logger.raise_runtimeerror(f"Dataset file {module_path} is not a .py file.") module_path = Path(module_path) if not module_path.is_file(): - raise FileNotFoundError(f"Dataset py file {module_path.as_posix()} does not exist or is not a file.") + logger.raise_runtimeerror(f"Dataset py file {module_path.as_posix()} does not exist or is not a file.") module = load_module_from_py_file(module_path.as_posix()) try: return getattr(module, func_name)(dataset_config, tokenizer, split, context_length) - except AttributeError as e: - print( + except AttributeError: + logger.raise_runtimeerror( f"It seems like the given method name ({func_name}) is not present in the dataset .py file ({module_path.as_posix()})." ) - raise e def get_data_collator(dataset_processer, dataset_config): @@ -53,16 +54,16 @@ def get_data_collator(dataset_processer, dataset_config): module_path, func_name = dataset_config.file, "get_data_collator" if not module_path.endswith(".py"): - raise ValueError(f"Dataset file {module_path} is not a .py file.") + logger.raise_runtimeerror(f"Dataset file {module_path} is not a .py file.") module_path = Path(module_path) if not module_path.is_file(): - raise FileNotFoundError(f"Dataset py file {module_path.as_posix()} does not exist or is not a file.") + logger.raise_runtimeerror(f"Dataset py file {module_path.as_posix()} does not exist or is not a file.") module = load_module_from_py_file(module_path.as_posix()) try: return getattr(module, func_name)(dataset_processer) except AttributeError: - print(f"Can not find the custom data_collator in the dataset.py file ({module_path.as_posix()}).") - print("Using the default data_collator instead.") + logger.info(f"Can not find the custom data_collator in the dataset.py file ({module_path.as_posix()}).") + logger.info("Using the default data_collator instead.") return None diff --git a/QEfficient/finetune/dataset/grammar_dataset.py b/QEfficient/finetune/dataset/grammar_dataset.py index 43ee39158..1261f4d93 100644 --- a/QEfficient/finetune/dataset/grammar_dataset.py +++ b/QEfficient/finetune/dataset/grammar_dataset.py @@ -10,6 +10,8 @@ from datasets import load_dataset from torch.utils.data import Dataset +from QEfficient.finetune.utils.logging_utils import logger + class grammar(Dataset): def __init__(self, tokenizer, csv_name=None, context_length=None): @@ -20,8 +22,8 @@ def __init__(self, tokenizer, csv_name=None, context_length=None): delimiter=",", ) except Exception as e: - print( - "Loading of grammar dataset failed! Please see [here](https://github.com/meta-llama/llama-recipes/blob/main/src/llama_recipes/datasets/grammar_dataset/grammar_dataset_process.ipynb) for details on how to download the dataset." + logger.raise_runtimeerror( + "Loading of grammar dataset failed! Please check (https://github.com/meta-llama/llama-recipes/blob/main/src/llama_recipes/datasets/grammar_dataset/grammar_dataset_process.ipynb) for details on how to download the dataset." ) raise e @@ -36,7 +38,7 @@ def convert_to_features(self, example_batch): # Create prompt and tokenize contexts and questions if self.print_text: - print("Input Text: ", self.clean_text(example_batch["text"])) + logger.info("Input Text: ", self.clean_text(example_batch["text"])) input_ = example_batch["input"] target_ = example_batch["target"] @@ -71,9 +73,9 @@ def get_dataset(dataset_config, tokenizer, csv_name=None, context_length=None): """cover function for handling loading the working dataset""" """dataset loading""" currPath = Path.cwd() / "datasets_grammar" / "grammar_train.csv" - print(f"Loading dataset {currPath}") + logger.info(f"Loading dataset {currPath}") csv_name = str(currPath) - print(csv_name) + logger.info(csv_name) dataset = grammar(tokenizer=tokenizer, csv_name=csv_name, context_length=context_length) return dataset diff --git a/QEfficient/finetune/eval.py b/QEfficient/finetune/eval.py index c0d29d38b..4ea6b70f3 100644 --- a/QEfficient/finetune/eval.py +++ b/QEfficient/finetune/eval.py @@ -19,13 +19,14 @@ from utils.train_utils import evaluation, print_model_size from QEfficient.finetune.configs.training import TrainConfig +from QEfficient.finetune.utils.logging_utils import logger try: import torch_qaic # noqa: F401 device = "qaic:0" except ImportError as e: - print(f"Warning: {e}. Moving ahead without these qaic modules.") + logger.warning(f"{e}. Moving ahead without these qaic modules.") device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Suppress all warnings @@ -77,10 +78,10 @@ def main(**kwargs): # If there is a mismatch between tokenizer vocab size and embedding matrix, # throw a warning and then expand the embedding matrix if len(tokenizer) > model.get_input_embeddings().weight.shape[0]: - print("WARNING: Resizing the embedding matrix to match the tokenizer vocab size.") + logger.warning("Resizing the embedding matrix to match the tokenizer vocab size.") model.resize_token_embeddings(len(tokenizer)) - print_model_size(model, train_config) + print_model_size(model) if train_config.run_validation: # TODO: vbaddi enable packing later in entire infra. @@ -88,14 +89,13 @@ def main(**kwargs): # dataset_val = ConcatDataset(dataset_val, chunk_size=train_config.context_length) eval_dataloader = get_dataloader(tokenizer, dataset_config, train_config, split="test") - - print(f"--> Num of Validation Set Batches loaded = {len(eval_dataloader)}") + logger.log_rank_zero(f"Num of Validation Set Batches loaded = {len(eval_dataloader)}") if len(eval_dataloader) == 0: raise ValueError( f"The eval set size is too small for dataloader to load even one batch. Please increase the size of eval set. ({len(eval_dataloader)=})" ) else: - print(f"--> Num of Validation Set Batches loaded = {len(eval_dataloader)}") + logger.log_rank_zero(f"Num of Validation Set Batches loaded = {len(eval_dataloader)}") model.to(device) _ = evaluation(model, train_config, eval_dataloader, None, tokenizer, device) diff --git a/QEfficient/finetune/utils/config_utils.py b/QEfficient/finetune/utils/config_utils.py index bdc3c0429..7e8cc6d85 100644 --- a/QEfficient/finetune/utils/config_utils.py +++ b/QEfficient/finetune/utils/config_utils.py @@ -18,6 +18,7 @@ from QEfficient.finetune.configs.peft_config import LoraConfig from QEfficient.finetune.configs.training import TrainConfig from QEfficient.finetune.dataset.dataset_config import DATASET_PREPROC +from QEfficient.finetune.utils.logging_utils import logger def update_config(config, **kwargs): @@ -46,8 +47,7 @@ def update_config(config, **kwargs): raise ValueError(f"Config '{config_name}' does not have parameter: '{param_name}'") else: config_type = type(config).__name__ - # FIXME (Meet): Once logger is available put this in debug level. - print(f"[WARNING]: Unknown parameter '{k}' for config type '{config_type}'") + logger.debug(f"Unknown parameter '{k}' for config type '{config_type}'") def generate_peft_config(train_config: TrainConfig, peft_config_file: str = None, **kwargs) -> Any: diff --git a/QEfficient/finetune/utils/dataset_utils.py b/QEfficient/finetune/utils/dataset_utils.py index 42d0aae71..de9163175 100644 --- a/QEfficient/finetune/utils/dataset_utils.py +++ b/QEfficient/finetune/utils/dataset_utils.py @@ -11,6 +11,7 @@ from QEfficient.finetune.data.sampler import DistributedLengthBasedBatchSampler from QEfficient.finetune.dataset.dataset_config import DATALOADER_COLLATE_FUNC, DATASET_PREPROC +from QEfficient.finetune.utils.logging_utils import logger def get_preprocessed_dataset( @@ -72,7 +73,7 @@ def get_dataloader(tokenizer, dataset_config, train_config, split: str = "train" print("custom_data_collator is used") dl_kwargs["collate_fn"] = custom_data_collator - print(f"length of dataset_{split}", len(dataset)) + logger.log_rank_zero(f"Length of {split} dataset is {len(dataset)}") # Create data loader dataloader = torch.utils.data.DataLoader( diff --git a/QEfficient/finetune/utils/logging_utils.py b/QEfficient/finetune/utils/logging_utils.py new file mode 100644 index 000000000..86dcf1e59 --- /dev/null +++ b/QEfficient/finetune/utils/logging_utils.py @@ -0,0 +1,57 @@ +# ----------------------------------------------------------------------------- +# +# Copyright (c) Qualcomm Technologies, Inc. and/or its subsidiaries. +# SPDX-License-Identifier: BSD-3-Clause +# +# ----------------------------------------------------------------------------- + +import logging +import os +from datetime import datetime + +import torch.distributed as dist + +from QEfficient.utils.constants import ROOT_DIR + + +class FTLogger: + def __init__(self, level=logging.DEBUG): + self.logger = logging.getLogger("QEfficient") + if not getattr(self.logger, "_custom_methods_added", False): + self._bind_custom_methods() + self.logger._custom_methods_added = True # Prevent adding handlers/methods twice + + def _bind_custom_methods(self): + def raise_runtimeerror(message): + self.logger.error(message) + raise RuntimeError(message) + + def log_rank_zero(msg: str, level: int = logging.INFO): + rank = dist.get_rank() if dist.is_available() and dist.is_initialized() else 0 + if rank != 0: + return + self.logger.log(level, msg, stacklevel=2) + + def prepare_dump_logs(dump_logs=False, level=logging.INFO): + if dump_logs: + logs_path = os.path.join(ROOT_DIR, "logs") + if not os.path.exists(logs_path): + os.makedirs(logs_path, exist_ok=True) + file_name = f"log-file-{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" + ".txt" + log_file = os.path.join(logs_path, file_name) + + fh = logging.FileHandler(log_file) + fh.setLevel(level) + formatter = logging.Formatter("%(levelname)s - %(name)s - %(message)s") + fh.setFormatter(formatter) + self.logger.addHandler(fh) + + self.logger.raise_runtimeerror = raise_runtimeerror + self.logger.log_rank_zero = log_rank_zero + self.logger.prepare_dump_logs = prepare_dump_logs + + def get_logger(self): + return self.logger + + +logger = FTLogger().get_logger() diff --git a/QEfficient/finetune/utils/plot_metrics.py b/QEfficient/finetune/utils/plot_metrics.py index 416ec3cdf..e91828d83 100644 --- a/QEfficient/finetune/utils/plot_metrics.py +++ b/QEfficient/finetune/utils/plot_metrics.py @@ -11,6 +11,8 @@ import matplotlib.pyplot as plt +from QEfficient.finetune.utils.logging_utils import logger + def plot_metric(data, metric_name, x_label, y_label, title, colors): plt.figure(figsize=(7, 6)) @@ -67,14 +69,14 @@ def plot_metrics_by_step(data, metric_name, x_label, y_label, colors): def plot_metrics(file_path): if not os.path.exists(file_path): - print(f"File {file_path} does not exist.") + logger.error(f"File {file_path} does not exist.") return with open(file_path, "r") as f: try: data = json.load(f) except json.JSONDecodeError: - print("Invalid JSON file.") + logger.error("Invalid JSON file.") return directory = os.path.dirname(file_path) diff --git a/QEfficient/finetune/utils/train_utils.py b/QEfficient/finetune/utils/train_utils.py index 9f9f06917..624889f6e 100644 --- a/QEfficient/finetune/utils/train_utils.py +++ b/QEfficient/finetune/utils/train_utils.py @@ -19,6 +19,7 @@ from tqdm import tqdm from QEfficient.finetune.configs.training import TrainConfig +from QEfficient.finetune.utils.logging_utils import logger try: import torch_qaic # noqa: F401 @@ -27,7 +28,7 @@ import torch_qaic.utils as qaic_utils # noqa: F401 from torch.qaic.amp import GradScaler as QAicGradScaler except ImportError as e: - print(f"Warning: {e}. Moving ahead without these qaic modules.") + logger.log_rank_zero(f"{e}. Moving ahead without these qaic modules.") from torch.amp import GradScaler @@ -83,10 +84,7 @@ def train( max_steps_reached = False # Flag to indicate max training steps reached tensorboard_updates = None - if train_config.enable_ddp: - if local_rank == 0: - tensorboard_updates = SummaryWriter() - else: + if (not train_config.enable_ddp) or (local_rank == 0): tensorboard_updates = SummaryWriter() device_type = torch.device(device).type @@ -113,27 +111,21 @@ def train( # Start the training loop for epoch in range(train_config.num_epochs): if loss_0_counter.item() == train_config.convergence_counter: - if train_config.enable_ddp: - print( - f"Not proceeding with epoch {epoch + 1} on device {local_rank} since loss value has been <= {train_config.convergence_loss} for last {loss_0_counter.item()} steps." - ) - break - else: - print( - f"Not proceeding with epoch {epoch + 1} since loss value has been <= {train_config.convergence_loss} for last {loss_0_counter.item()} steps." - ) - break + logger.log_rank_zero( + f"Skipping epoch {epoch + 1} since loss value has been <= {train_config.convergence_loss} for last {loss_0_counter.item()} steps." + ) + break if train_config.use_peft and train_config.from_peft_checkpoint: intermediate_epoch = int(train_config.from_peft_checkpoint.split("/")[-2].split("_")[-1]) - 1 if epoch < intermediate_epoch: - print(f"Skipping epoch {epoch + 1} since fine tuning has already completed for it.") + logger.log_rank_zero(f"Skipping epoch {epoch + 1} since fine tuning has already completed for it.") # to bring the count of train_step in sync with where it left off total_train_steps += len(train_dataloader) continue - print(f"Starting epoch {epoch + 1}/{train_config.num_epochs}") - print(f"train_config.max_train_step: {train_config.max_train_step}") + logger.log_rank_zero(f"Starting epoch {epoch + 1}/{train_config.num_epochs}") + logger.log_rank_zero(f"train_config.max_train_step: {train_config.max_train_step}") # stop when the maximum number of training steps is reached if max_steps_reached: break @@ -160,8 +152,8 @@ def train( # to bring the count of train_step in sync with where it left off if epoch == intermediate_epoch and step == 0: total_train_steps += intermediate_step - print( - f"skipping first {intermediate_step} steps for epoch {epoch + 1}, since fine tuning has already completed for them." + logger.log_rank_zero( + f"Skipping first {intermediate_step} steps for epoch {epoch + 1}, since fine tuning has already completed for it." ) if epoch == intermediate_epoch and step < intermediate_step: total_train_steps += 1 @@ -197,7 +189,7 @@ def train( labels = batch["labels"][:, 0] preds = torch.nn.functional.softmax(logits, dim=-1) acc_helper.forward(preds, labels) - print("Mismatches detected:", verifier.get_perop_mismatch_count()) + logger.info("Mismatches detected:", verifier.get_perop_mismatch_count()) else: model_outputs = model(**batch) loss = model_outputs.loss # Forward call @@ -223,10 +215,7 @@ def train( else: loss_0_counter = torch.tensor([0]).to(device) - if train_config.enable_ddp: - if local_rank == 0: - tensorboard_updates.add_scalars("loss", {"train": loss}, total_train_steps) - else: + if (not train_config.enable_ddp) or (local_rank == 0): tensorboard_updates.add_scalars("loss", {"train": loss}, total_train_steps) if train_config.save_metrics: @@ -277,18 +266,11 @@ def train( val_step_metric, val_metric, ) - if train_config.enable_ddp: - if loss_0_counter.item() == train_config.convergence_counter: - print( - f"Loss value has been <= {train_config.convergence_loss} for last {loss_0_counter.item()} steps. Hence, stopping the fine tuning on device {local_rank}." - ) - break - else: - if loss_0_counter.item() == train_config.convergence_counter: - print( - f"Loss value has been <= {train_config.convergence_loss} for last {loss_0_counter.item()} steps. Hence, stopping the fine tuning." - ) - break + if loss_0_counter.item() == train_config.convergence_counter: + logger.log_rank_zero( + f"Loss value has been <= {train_config.convergence_loss} for last {loss_0_counter.item()} steps.Hence,stopping the fine tuning." + ) + break pbar.close() epoch_end_time = time.perf_counter() - epoch_start_time @@ -318,18 +300,10 @@ def train( lr_scheduler.step() if train_config.run_validation: - if train_config.enable_ddp: - dist.barrier() - eval_epoch_loss, eval_metric, temp_val_loss, temp_step_metric = evaluation_helper( - model, train_config, eval_dataloader, device - ) - if local_rank == 0: - tensorboard_updates.add_scalars("loss", {"eval": eval_epoch_loss}, total_train_steps) - - else: - eval_epoch_loss, eval_metric, temp_val_loss, temp_step_metric = evaluation_helper( - model, train_config, eval_dataloader, device - ) + eval_epoch_loss, eval_metric, temp_val_loss, temp_step_metric = evaluation_helper( + model, train_config, eval_dataloader, device + ) + if (not train_config.enable_ddp) or (local_rank == 0): tensorboard_updates.add_scalars("loss", {"eval": eval_epoch_loss}, total_train_steps) if train_config.save_metrics: @@ -347,15 +321,15 @@ def train( if train_config.run_validation: if eval_epoch_loss < best_val_loss: best_val_loss = eval_epoch_loss - print(f"best eval loss on epoch {epoch + 1} is {best_val_loss}") + logger.log_rank_zero(f"best eval loss on epoch {epoch + 1} is {best_val_loss}") val_loss.append(float(eval_epoch_loss)) val_metric.append(float(eval_metric)) if train_config.task_type == "seq_classification": - print( + logger.log_rank_zero( f"Epoch {epoch + 1}: train_acc={metric_val:.4f}, train_epoch_loss={train_epoch_loss:.4f}, epoch time {epoch_end_time}s" ) else: - print( + logger.log_rank_zero( f"Epoch {epoch + 1}: train_metric={metric_val:.4f}, train_epoch_loss={train_epoch_loss:.4f}, epoch time {epoch_end_time}s" ) @@ -403,6 +377,9 @@ def evaluation_helper(model, train_config, eval_dataloader, device): Returns: eval_epoch_loss, eval_metric, eval_step_loss, eval_step_metric """ + if train_config.enable_ddp: + dist.barrier() + model.eval() if train_config.task_type == "seq_classification": @@ -462,7 +439,7 @@ def evaluation_helper(model, train_config, eval_dataloader, device): eval_metric = torch.exp(eval_epoch_loss) # Print evaluation metrics - print(f" {eval_metric.detach().cpu()=} {eval_epoch_loss.detach().cpu()=}") + logger.log_rank_zero(f"{eval_metric.detach().cpu()=} {eval_epoch_loss.detach().cpu()=}") return eval_epoch_loss, eval_metric, val_step_loss, val_step_metric @@ -475,18 +452,28 @@ def get_longest_seq_length(data: List[Dict]) -> Tuple[int, int]: return longest_seq_length, longest_seq_ix -def print_model_size(model, config) -> None: +def print_model_size(model) -> None: """ Print model name, the number of trainable parameters and initialization time. Args: - model: The PyTorch model. - model_name (str): Name of the model. + model: PyTorch model. """ - - print(f"--> Model {config.model_name}") total_params = sum(p.numel() for p in model.parameters() if p.requires_grad) - print(f"\n--> {config.model_name} has {total_params / 1e6} Million params\n") + logger.log_rank_zero(f"Model has {total_params / 1e6} Million params.") + + +def print_trainable_parameters(model) -> None: + """ + Print the number of trainable parameters, all params and percentage of trainablke params. + + Args: + model: The PyTorch model. + """ + trainable_params, all_param = model.get_nb_trainable_parameters() + logger.log_rank_zero( + f"Trainable params: {trainable_params:,d} || all params: {all_param:,d} || trainable%: {100 * trainable_params / all_param:.4f}" + ) def save_to_json( From 1e1519b5e8956a12c6bc9dc109580d2ea992ce17 Mon Sep 17 00:00:00 2001 From: Mamta Singh Date: Wed, 25 Jun 2025 09:39:16 +0000 Subject: [PATCH 2/3] address comments Signed-off-by: Mamta Singh --- QEfficient/finetune/dataset/custom_dataset.py | 6 ++++-- .../finetune/dataset/grammar_dataset.py | 7 ++----- QEfficient/finetune/eval.py | 7 +------ QEfficient/finetune/utils/helper.py | 6 ++++++ QEfficient/finetune/utils/logging_utils.py | 6 ++---- QEfficient/finetune/utils/train_utils.py | 19 +++++++------------ 6 files changed, 22 insertions(+), 29 deletions(-) diff --git a/QEfficient/finetune/dataset/custom_dataset.py b/QEfficient/finetune/dataset/custom_dataset.py index 757f32e01..45e5521c6 100644 --- a/QEfficient/finetune/dataset/custom_dataset.py +++ b/QEfficient/finetune/dataset/custom_dataset.py @@ -64,6 +64,8 @@ def get_data_collator(dataset_processer, dataset_config): try: return getattr(module, func_name)(dataset_processer) except AttributeError: - logger.info(f"Can not find the custom data_collator in the dataset.py file ({module_path.as_posix()}).") - logger.info("Using the default data_collator instead.") + logger.log_rank_zero( + f"Can not find the custom data_collator in the dataset.py file ({module_path.as_posix()})." + ) + logger.log_rank_zero("Using the default data_collator instead.") return None diff --git a/QEfficient/finetune/dataset/grammar_dataset.py b/QEfficient/finetune/dataset/grammar_dataset.py index 1261f4d93..7956ca781 100644 --- a/QEfficient/finetune/dataset/grammar_dataset.py +++ b/QEfficient/finetune/dataset/grammar_dataset.py @@ -38,7 +38,7 @@ def convert_to_features(self, example_batch): # Create prompt and tokenize contexts and questions if self.print_text: - logger.info("Input Text: ", self.clean_text(example_batch["text"])) + logger.log_rank_zero("Input Text: ", self.clean_text(example_batch["text"])) input_ = example_batch["input"] target_ = example_batch["target"] @@ -73,9 +73,6 @@ def get_dataset(dataset_config, tokenizer, csv_name=None, context_length=None): """cover function for handling loading the working dataset""" """dataset loading""" currPath = Path.cwd() / "datasets_grammar" / "grammar_train.csv" - logger.info(f"Loading dataset {currPath}") - csv_name = str(currPath) - logger.info(csv_name) - dataset = grammar(tokenizer=tokenizer, csv_name=csv_name, context_length=context_length) + dataset = grammar(tokenizer=tokenizer, csv_name=str(currPath), context_length=context_length) return dataset diff --git a/QEfficient/finetune/eval.py b/QEfficient/finetune/eval.py index 4ea6b70f3..94d718f2c 100644 --- a/QEfficient/finetune/eval.py +++ b/QEfficient/finetune/eval.py @@ -84,18 +84,13 @@ def main(**kwargs): print_model_size(model) if train_config.run_validation: - # TODO: vbaddi enable packing later in entire infra. - # if train_config.batching_strategy == "packing": - # dataset_val = ConcatDataset(dataset_val, chunk_size=train_config.context_length) - eval_dataloader = get_dataloader(tokenizer, dataset_config, train_config, split="test") - logger.log_rank_zero(f"Num of Validation Set Batches loaded = {len(eval_dataloader)}") if len(eval_dataloader) == 0: raise ValueError( f"The eval set size is too small for dataloader to load even one batch. Please increase the size of eval set. ({len(eval_dataloader)=})" ) else: - logger.log_rank_zero(f"Num of Validation Set Batches loaded = {len(eval_dataloader)}") + logger.log_rank_zero(f"Number of Validation Set Batches loaded = {len(eval_dataloader)}") model.to(device) _ = evaluation(model, train_config, eval_dataloader, None, tokenizer, device) diff --git a/QEfficient/finetune/utils/helper.py b/QEfficient/finetune/utils/helper.py index fcc44fec8..ae7f47c83 100644 --- a/QEfficient/finetune/utils/helper.py +++ b/QEfficient/finetune/utils/helper.py @@ -5,7 +5,13 @@ # # ----------------------------------------------------------------------------- +import os + TASK_TYPE = ["generation", "seq_classification"] PEFT_METHOD = ["lora"] DEVICE = ["qaic", "cpu", "cuda"] BATCHING_STRATEGY = ["padding", "packing"] + + +def is_rank_zero(): + return int(os.getenv("LOCAL_RANK", 0)) == 0 diff --git a/QEfficient/finetune/utils/logging_utils.py b/QEfficient/finetune/utils/logging_utils.py index 86dcf1e59..856fc100f 100644 --- a/QEfficient/finetune/utils/logging_utils.py +++ b/QEfficient/finetune/utils/logging_utils.py @@ -9,8 +9,7 @@ import os from datetime import datetime -import torch.distributed as dist - +from QEfficient.finetune.utils.helper import is_rank_zero from QEfficient.utils.constants import ROOT_DIR @@ -27,8 +26,7 @@ def raise_runtimeerror(message): raise RuntimeError(message) def log_rank_zero(msg: str, level: int = logging.INFO): - rank = dist.get_rank() if dist.is_available() and dist.is_initialized() else 0 - if rank != 0: + if not is_rank_zero: return self.logger.log(level, msg, stacklevel=2) diff --git a/QEfficient/finetune/utils/train_utils.py b/QEfficient/finetune/utils/train_utils.py index 624889f6e..b5ce6a736 100644 --- a/QEfficient/finetune/utils/train_utils.py +++ b/QEfficient/finetune/utils/train_utils.py @@ -19,6 +19,7 @@ from tqdm import tqdm from QEfficient.finetune.configs.training import TrainConfig +from QEfficient.finetune.utils.helper import is_rank_zero from QEfficient.finetune.utils.logging_utils import logger try: @@ -84,7 +85,7 @@ def train( max_steps_reached = False # Flag to indicate max training steps reached tensorboard_updates = None - if (not train_config.enable_ddp) or (local_rank == 0): + if is_rank_zero(): tensorboard_updates = SummaryWriter() device_type = torch.device(device).type @@ -202,20 +203,14 @@ def train( total_loss += loss.detach().float() # Accumalate gradients loss = loss / train_config.gradient_accumulation_steps - if train_config.enable_ddp: - if local_rank == 0: - if loss <= train_config.convergence_loss: - loss_0_counter += 1 - else: - loss_0_counter = torch.tensor([0]).to(device) - dist.broadcast(loss_0_counter, src=0) - else: + if is_rank_zero(): if loss <= train_config.convergence_loss: loss_0_counter += 1 else: loss_0_counter = torch.tensor([0]).to(device) - - if (not train_config.enable_ddp) or (local_rank == 0): + if train_config.enable_ddp: + dist.broadcast(loss_0_counter, src=0) + if is_rank_zero(): tensorboard_updates.add_scalars("loss", {"train": loss}, total_train_steps) if train_config.save_metrics: @@ -303,7 +298,7 @@ def train( eval_epoch_loss, eval_metric, temp_val_loss, temp_step_metric = evaluation_helper( model, train_config, eval_dataloader, device ) - if (not train_config.enable_ddp) or (local_rank == 0): + if is_rank_zero(): tensorboard_updates.add_scalars("loss", {"eval": eval_epoch_loss}, total_train_steps) if train_config.save_metrics: From 3d8a53e4bd71063145b0b4ad2a048946695aad1f Mon Sep 17 00:00:00 2001 From: Mamta Singh Date: Wed, 25 Jun 2025 14:08:00 +0000 Subject: [PATCH 3/3] modify error handling Signed-off-by: Mamta Singh --- QEfficient/cloud/finetune.py | 19 +++++++------- QEfficient/finetune/configs/training.py | 1 - QEfficient/finetune/dataset/alpaca_dataset.py | 10 ++++++- QEfficient/finetune/dataset/custom_dataset.py | 17 +++++++----- .../finetune/dataset/grammar_dataset.py | 8 +++--- QEfficient/finetune/eval.py | 9 ++++--- QEfficient/finetune/utils/config_utils.py | 26 +++++++++++-------- QEfficient/finetune/utils/dataset_utils.py | 7 ++--- QEfficient/finetune/utils/logging_utils.py | 16 ++++++------ QEfficient/finetune/utils/parser.py | 14 ++++------ QEfficient/finetune/utils/plot_metrics.py | 4 +-- QEfficient/finetune/utils/train_utils.py | 5 ++-- 12 files changed, 76 insertions(+), 60 deletions(-) diff --git a/QEfficient/cloud/finetune.py b/QEfficient/cloud/finetune.py index e0563d11b..63fe2106a 100644 --- a/QEfficient/cloud/finetune.py +++ b/QEfficient/cloud/finetune.py @@ -5,6 +5,7 @@ # # ----------------------------------------------------------------------------- +import logging import random import warnings from typing import Any, Dict, Optional, Union @@ -40,7 +41,7 @@ try: import torch_qaic # noqa: F401 except ImportError as e: - logger.log_rank_zero(f"{e}. Moving ahead without these qaic modules.") + logger.log_rank_zero(f"{e}. Moving ahead without these qaic modules.", logging.WARNING) # Suppress all warnings @@ -121,7 +122,7 @@ def load_model_and_tokenizer( ) if not hasattr(model, "base_model_prefix"): - logger.raise_runtimeerror("Given huggingface model does not have 'base_model_prefix' attribute.") + logger.raise_error("Given huggingface model does not have 'base_model_prefix' attribute.", RuntimeError) for param in getattr(model, model.base_model_prefix).parameters(): param.requires_grad = False @@ -146,7 +147,7 @@ def load_model_and_tokenizer( # If there is a mismatch between tokenizer vocab size and embedding matrix, # throw a warning and then expand the embedding matrix if len(tokenizer) > model.get_input_embeddings().weight.shape[0]: - logger.log_rank_zero("Resizing the embedding matrix to match the tokenizer vocab size.", logger.WARNING) + logger.log_rank_zero("Resizing the embedding matrix to match the tokenizer vocab size.", logging.WARNING) model.resize_token_embeddings(len(tokenizer)) print_model_size(model) @@ -161,8 +162,8 @@ def load_model_and_tokenizer( if hasattr(model, "supports_gradient_checkpointing") and model.supports_gradient_checkpointing: model.gradient_checkpointing_enable(gradient_checkpointing_kwargs={"preserve_rng_state": False}) else: - logger.raise_runtimeerror( - "Given model doesn't support gradient checkpointing. Please disable it and run it." + logger.raise_error( + "Given model doesn't support gradient checkpointing. Please disable it and run it.", RuntimeError ) model = apply_peft(model, train_config, peft_config_file, **kwargs) @@ -237,8 +238,9 @@ def setup_dataloaders( if train_config.run_validation: eval_dataloader = get_dataloader(tokenizer, dataset_config, train_config, split="val") if len(eval_dataloader) == 0: - logger.raise_runtimeerror( - f"The eval set size is too small for dataloader to load even one batch. Please increase the size of eval set. ({len(eval_dataloader)=})" + logger.raise_error( + f"The eval set size is too small for dataloader to load even one batch. Please increase the size of eval set. ({len(eval_dataloader)=})", + ValueError, ) else: logger.log_rank_zero(f"Number of Validation Set Batches loaded = {len(eval_dataloader)}") @@ -280,8 +282,7 @@ def main(peft_config_file: str = None, **kwargs) -> None: dataset_config = generate_dataset_config(train_config.dataset) update_config(dataset_config, **kwargs) - logger.prepare_dump_logs(train_config.dump_logs) - logger.setLevel(train_config.log_level) + logger.prepare_for_logs(train_config.output_dir, train_config.dump_logs, train_config.log_level) setup_distributed_training(train_config) setup_seeds(train_config.seed) diff --git a/QEfficient/finetune/configs/training.py b/QEfficient/finetune/configs/training.py index 272819120..383d0e2b4 100644 --- a/QEfficient/finetune/configs/training.py +++ b/QEfficient/finetune/configs/training.py @@ -95,7 +95,6 @@ class TrainConfig: use_profiler: bool = False # Enable pytorch profiler, can not be used with flop counter at the same time. # profiler_dir: str = "PATH/to/save/profiler/results" # will be used if using profiler - dump_root_dir: str = "mismatches/step_" opByOpVerifier: bool = False dump_logs: bool = True diff --git a/QEfficient/finetune/dataset/alpaca_dataset.py b/QEfficient/finetune/dataset/alpaca_dataset.py index aecc0d2cc..c6ddb6ce1 100644 --- a/QEfficient/finetune/dataset/alpaca_dataset.py +++ b/QEfficient/finetune/dataset/alpaca_dataset.py @@ -11,6 +11,8 @@ import torch from torch.utils.data import Dataset +from QEfficient.finetune.utils.logging_utils import logger + PROMPT_DICT = { "prompt_input": ( "Below is an instruction that describes a task, paired with an input that provides further context. " @@ -27,7 +29,13 @@ class InstructionDataset(Dataset): def __init__(self, dataset_config, tokenizer, partition="train", context_length=None): - self.ann = json.load(open(dataset_config.data_path)) + try: + self.ann = json.load(open(dataset_config.data_path)) + except FileNotFoundError: + logger.raise_error( + "Loading of alpaca dataset failed! Please use (wget -c https://raw.githubusercontent.com/tatsu-lab/stanford_alpaca/refs/heads/main/alpaca_data.json -P dataset/) to download the alpaca dataset.", + FileNotFoundError, + ) # Use 5% of the dataset for evaluation eval_length = int(len(self.ann) / 20) if partition == "train": diff --git a/QEfficient/finetune/dataset/custom_dataset.py b/QEfficient/finetune/dataset/custom_dataset.py index 45e5521c6..4a1f500e3 100644 --- a/QEfficient/finetune/dataset/custom_dataset.py +++ b/QEfficient/finetune/dataset/custom_dataset.py @@ -32,18 +32,21 @@ def get_custom_dataset(dataset_config, tokenizer, split: str, context_length=Non module_path, func_name = dataset_config.file, "get_custom_dataset" if not module_path.endswith(".py"): - logger.raise_runtimeerror(f"Dataset file {module_path} is not a .py file.") + logger.raise_error(f"Dataset file {module_path} is not a .py file.", ValueError) module_path = Path(module_path) if not module_path.is_file(): - logger.raise_runtimeerror(f"Dataset py file {module_path.as_posix()} does not exist or is not a file.") + logger.raise_error( + f"Dataset py file {module_path.as_posix()} does not exist or is not a file.", FileNotFoundError + ) module = load_module_from_py_file(module_path.as_posix()) try: return getattr(module, func_name)(dataset_config, tokenizer, split, context_length) except AttributeError: - logger.raise_runtimeerror( - f"It seems like the given method name ({func_name}) is not present in the dataset .py file ({module_path.as_posix()})." + logger.raise_error( + f"It seems like the given method name ({func_name}) is not present in the dataset .py file ({module_path.as_posix()}).", + AttributeError, ) @@ -54,11 +57,13 @@ def get_data_collator(dataset_processer, dataset_config): module_path, func_name = dataset_config.file, "get_data_collator" if not module_path.endswith(".py"): - logger.raise_runtimeerror(f"Dataset file {module_path} is not a .py file.") + logger.raise_error(f"Dataset file {module_path} is not a .py file.", ValueError) module_path = Path(module_path) if not module_path.is_file(): - logger.raise_runtimeerror(f"Dataset py file {module_path.as_posix()} does not exist or is not a file.") + logger.raise_error( + f"Dataset py file {module_path.as_posix()} does not exist or is not a file.", FileNotFoundError + ) module = load_module_from_py_file(module_path.as_posix()) try: diff --git a/QEfficient/finetune/dataset/grammar_dataset.py b/QEfficient/finetune/dataset/grammar_dataset.py index 7956ca781..e40c01e97 100644 --- a/QEfficient/finetune/dataset/grammar_dataset.py +++ b/QEfficient/finetune/dataset/grammar_dataset.py @@ -21,11 +21,11 @@ def __init__(self, tokenizer, csv_name=None, context_length=None): data_files={"train": [csv_name]}, # "eval": "grammar_validation.csv"}, delimiter=",", ) - except Exception as e: - logger.raise_runtimeerror( - "Loading of grammar dataset failed! Please check (https://github.com/meta-llama/llama-recipes/blob/main/src/llama_recipes/datasets/grammar_dataset/grammar_dataset_process.ipynb) for details on how to download the dataset." + except FileNotFoundError: + logger.raise_error( + "Loading of grammar dataset failed! Please check (https://github.com/meta-llama/llama-recipes/blob/main/src/llama_recipes/datasets/grammar_dataset/grammar_dataset_process.ipynb) for details on how to download the dataset.", + FileNotFoundError, ) - raise e self.context_length = context_length self.tokenizer = tokenizer diff --git a/QEfficient/finetune/eval.py b/QEfficient/finetune/eval.py index 94d718f2c..72407a91e 100644 --- a/QEfficient/finetune/eval.py +++ b/QEfficient/finetune/eval.py @@ -26,7 +26,7 @@ device = "qaic:0" except ImportError as e: - logger.warning(f"{e}. Moving ahead without these qaic modules.") + logger.log_rank_zero(f"{e}. Moving ahead without these qaic modules.") device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Suppress all warnings @@ -78,7 +78,7 @@ def main(**kwargs): # If there is a mismatch between tokenizer vocab size and embedding matrix, # throw a warning and then expand the embedding matrix if len(tokenizer) > model.get_input_embeddings().weight.shape[0]: - logger.warning("Resizing the embedding matrix to match the tokenizer vocab size.") + logger.log_rank_zero("Resizing the embedding matrix to match the tokenizer vocab size.") model.resize_token_embeddings(len(tokenizer)) print_model_size(model) @@ -86,8 +86,9 @@ def main(**kwargs): if train_config.run_validation: eval_dataloader = get_dataloader(tokenizer, dataset_config, train_config, split="test") if len(eval_dataloader) == 0: - raise ValueError( - f"The eval set size is too small for dataloader to load even one batch. Please increase the size of eval set. ({len(eval_dataloader)=})" + logger.raise_error( + f"The eval set size is too small for dataloader to load even one batch. Please increase the size of eval set. ({len(eval_dataloader)=})", + ValueError, ) else: logger.log_rank_zero(f"Number of Validation Set Batches loaded = {len(eval_dataloader)}") diff --git a/QEfficient/finetune/utils/config_utils.py b/QEfficient/finetune/utils/config_utils.py index 7e8cc6d85..90c15cd7f 100644 --- a/QEfficient/finetune/utils/config_utils.py +++ b/QEfficient/finetune/utils/config_utils.py @@ -44,7 +44,9 @@ def update_config(config, **kwargs): if hasattr(config, param_name): setattr(config, param_name, v) else: - raise ValueError(f"Config '{config_name}' does not have parameter: '{param_name}'") + logger.raise_error( + f"Config '{config_name}' does not have parameter: '{param_name}'", ValueError + ) else: config_type = type(config).__name__ logger.debug(f"Unknown parameter '{k}' for config type '{config_type}'") @@ -70,7 +72,7 @@ def generate_peft_config(train_config: TrainConfig, peft_config_file: str = None else: config_map = {"lora": (LoraConfig, PeftLoraConfig)} if train_config.peft_method not in config_map: - raise RuntimeError(f"Peft config not found: {train_config.peft_method}") + logger.raise_error(f"Peft config not found: {train_config.peft_method}", RuntimeError) config_cls, peft_config_cls = config_map[train_config.peft_method] if config_cls is None: @@ -119,7 +121,7 @@ def validate_config(config_data: Dict[str, Any], config_type: str = "lora") -> N - Ensures types match expected values (int, float, list, etc.). """ if config_type.lower() != "lora": - raise ValueError(f"Unsupported config_type: {config_type}. Only 'lora' is supported.") + logger.raise_error(f"Unsupported config_type: {config_type}. Only 'lora' is supported.", ValueError) required_fields = { "r": int, @@ -136,26 +138,28 @@ def validate_config(config_data: Dict[str, Any], config_type: str = "lora") -> N # Check for missing required fields missing_fields = [field for field in required_fields if field not in config_data] if missing_fields: - raise ValueError(f"Missing required fields in {config_type} config: {missing_fields}") + logger.raise_error(f"Missing required fields in {config_type} config: {missing_fields}", ValueError) # Validate types of required fields for field, expected_type in required_fields.items(): if not isinstance(config_data[field], expected_type): - raise ValueError( + logger.raise_error( f"Field '{field}' in {config_type} config must be of type {expected_type.__name__}, " - f"got {type(config_data[field]).__name__}" + f"got {type(config_data[field]).__name__}", + ValueError, ) # Validate target_modules contains strings if not all(isinstance(mod, str) for mod in config_data["target_modules"]): - raise ValueError("All elements in 'target_modules' must be strings") + logger.raise_error("All elements in 'target_modules' must be strings", ValueError) # Validate types of optional fields if present for field, expected_type in optional_fields.items(): if field in config_data and not isinstance(config_data[field], expected_type): - raise ValueError( + logger.raise_error( f"Field '{field}' in {config_type} config must be of type {expected_type.__name__}, " - f"got {type(config_data[field]).__name__}" + f"got {type(config_data[field]).__name__}", + ValueError, ) @@ -173,7 +177,7 @@ def load_config_file(config_path: str) -> Dict[str, Any]: ValueError: If the file format is unsupported. """ if not os.path.exists(config_path): - raise FileNotFoundError(f"Config file not found: {config_path}") + logger.raise_error(f"Config file not found: {config_path}", FileNotFoundError) with open(config_path, "r") as f: if config_path.endswith(".yaml") or config_path.endswith(".yml"): @@ -181,4 +185,4 @@ def load_config_file(config_path: str) -> Dict[str, Any]: elif config_path.endswith(".json"): return json.load(f) else: - raise ValueError("Unsupported config file format. Use .yaml, .yml, or .json") + logger.raise_error("Unsupported config file format. Use .yaml, .yml, or .json", ValueError) diff --git a/QEfficient/finetune/utils/dataset_utils.py b/QEfficient/finetune/utils/dataset_utils.py index de9163175..73963cf0c 100644 --- a/QEfficient/finetune/utils/dataset_utils.py +++ b/QEfficient/finetune/utils/dataset_utils.py @@ -18,7 +18,7 @@ def get_preprocessed_dataset( tokenizer, dataset_config, split: str = "train", context_length: int = None ) -> torch.utils.data.Dataset: if dataset_config.dataset not in DATASET_PREPROC: - raise NotImplementedError(f"{dataset_config.dataset} is not (yet) implemented") + logger.raise_error(f"{dataset_config.dataset} is not (yet) implemented", NotImplementedError) def get_split(): return dataset_config.train_split if split == "train" else dataset_config.test_split @@ -39,8 +39,9 @@ def get_dataloader_kwargs(train_config, dataset, dataset_processer, split): if train_config.enable_ddp: if train_config.enable_sorting_for_ddp: if train_config.context_length: - raise ValueError( - "Sorting cannot be done with padding, Please disable sorting or pass context_length as None to disable padding" + logger.raise_error( + "Sorting cannot be done with padding, Please disable sorting or pass context_length as None to disable padding", + ValueError, ) else: kwargs["batch_sampler"] = DistributedLengthBasedBatchSampler( diff --git a/QEfficient/finetune/utils/logging_utils.py b/QEfficient/finetune/utils/logging_utils.py index 856fc100f..c314fc038 100644 --- a/QEfficient/finetune/utils/logging_utils.py +++ b/QEfficient/finetune/utils/logging_utils.py @@ -10,29 +10,29 @@ from datetime import datetime from QEfficient.finetune.utils.helper import is_rank_zero -from QEfficient.utils.constants import ROOT_DIR class FTLogger: - def __init__(self, level=logging.DEBUG): + def __init__(self): self.logger = logging.getLogger("QEfficient") if not getattr(self.logger, "_custom_methods_added", False): self._bind_custom_methods() self.logger._custom_methods_added = True # Prevent adding handlers/methods twice def _bind_custom_methods(self): - def raise_runtimeerror(message): + def raise_error(message, errortype=RuntimeError): self.logger.error(message) - raise RuntimeError(message) + raise errortype(message) def log_rank_zero(msg: str, level: int = logging.INFO): if not is_rank_zero: return self.logger.log(level, msg, stacklevel=2) - def prepare_dump_logs(dump_logs=False, level=logging.INFO): + def prepare_for_logs(output_path, dump_logs=False, level=logging.INFO): + self.logger.setLevel(level) if dump_logs: - logs_path = os.path.join(ROOT_DIR, "logs") + logs_path = os.path.join(output_path, "logs") if not os.path.exists(logs_path): os.makedirs(logs_path, exist_ok=True) file_name = f"log-file-{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" + ".txt" @@ -44,9 +44,9 @@ def prepare_dump_logs(dump_logs=False, level=logging.INFO): fh.setFormatter(formatter) self.logger.addHandler(fh) - self.logger.raise_runtimeerror = raise_runtimeerror + self.logger.raise_error = raise_error self.logger.log_rank_zero = log_rank_zero - self.logger.prepare_dump_logs = prepare_dump_logs + self.logger.prepare_for_logs = prepare_for_logs def get_logger(self): return self.logger diff --git a/QEfficient/finetune/utils/parser.py b/QEfficient/finetune/utils/parser.py index 39ce5f969..980f6a3b9 100644 --- a/QEfficient/finetune/utils/parser.py +++ b/QEfficient/finetune/utils/parser.py @@ -254,18 +254,14 @@ def get_finetune_parser(): action="store_true", help="Enable distributed data parallel training. This will load the replicas of model on given number of devices and train the model. This should be used using torchrun interface. Please check docs for exact usage.", ) - parser.add_argument( - "--dump_root_dir", - "--dump-root-dir", - required=False, - type=str, - default="mismatches/step_", - help="Directory for mismatch dumps by opByOpVerifier", - ) parser.add_argument( "--opByOpVerifier", action="store_true", - help="Enable operation-by-operation verification w.r.t reference device(cpu). It is a context manager interface that captures and verifies each operator against reference device. In case results of test & reference do not match under given tolerances, a standalone unittest is generated at dump_root_dir.", + help=argparse.SUPPRESS, + # This is for debugging purpose only. + # Enables operation-by-operation verification w.r.t reference device(cpu). + # It is a context manager interface that captures and verifies each operator against reference device. + # In case results of test & reference do not match under given tolerances, a standalone unittest is generated at dump_root_dir. ) return parser diff --git a/QEfficient/finetune/utils/plot_metrics.py b/QEfficient/finetune/utils/plot_metrics.py index e91828d83..208a706b1 100644 --- a/QEfficient/finetune/utils/plot_metrics.py +++ b/QEfficient/finetune/utils/plot_metrics.py @@ -69,14 +69,14 @@ def plot_metrics_by_step(data, metric_name, x_label, y_label, colors): def plot_metrics(file_path): if not os.path.exists(file_path): - logger.error(f"File {file_path} does not exist.") + logger.raise_error(f"File {file_path} does not exist.", FileNotFoundError) return with open(file_path, "r") as f: try: data = json.load(f) except json.JSONDecodeError: - logger.error("Invalid JSON file.") + logger.raise_error("Invalid JSON file.", json.JSONDecodeError) return directory = os.path.dirname(file_path) diff --git a/QEfficient/finetune/utils/train_utils.py b/QEfficient/finetune/utils/train_utils.py index b5ce6a736..6eec0887a 100644 --- a/QEfficient/finetune/utils/train_utils.py +++ b/QEfficient/finetune/utils/train_utils.py @@ -85,8 +85,9 @@ def train( max_steps_reached = False # Flag to indicate max training steps reached tensorboard_updates = None + tensorboard_log_dir = train_config.output_dir + "/runs/" + f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}" if is_rank_zero(): - tensorboard_updates = SummaryWriter() + tensorboard_updates = SummaryWriter(log_dir=tensorboard_log_dir) device_type = torch.device(device).type @@ -181,7 +182,7 @@ def train( atol=1e-1, use_ref_output_on_mismatch=True, filter_config=qaic_debug.DispatchFilterConfig.default(device), - dump_root_dir=train_config.dump_root_dir + str(step), + dump_root_dir=train_config.output_dir + "/mismatches/step_" + str(step), ) as verifier: model_outputs = model(**batch) loss = model_outputs.loss # Forward call