diff --git a/open_lm/attention.py b/open_lm/attention.py index 7f2e2f4c..e0e8aba5 100644 --- a/open_lm/attention.py +++ b/open_lm/attention.py @@ -111,7 +111,7 @@ def torch_attn(queries, keys, values, is_causal, attention_mask=None): if attention_mask is None: bias = None # If we only have one query, assume we don't need to be in causal mode (can attend to all keys). - if queries.shape[1] == 1: + if queries.shape == 1: is_causal = False else: if not is_causal: diff --git a/open_lm/data.py b/open_lm/data.py index 107ff6e0..95c4c489 100644 --- a/open_lm/data.py +++ b/open_lm/data.py @@ -13,6 +13,7 @@ from functools import partial from itertools import islice import copy +from datasets import concatenate_datasets import numpy as np import pandas as pd @@ -545,21 +546,136 @@ def get_synthetic_dataset(args, is_train, epoch, tokenizer, data_key, floor): return DataInfo(dataloader, sampler) +class JSONLDataset(Dataset): + def __init__(self, file_path, tokenizer, seq_len, padding_side): + self.padding_side = padding_side + self.urls = [file_path] + self.eot_token = 0 + self.pad_token = 1 + self.ignore_tok = -100 + self.tokenizer = tokenizer + self.seq_len = seq_len + self.data, self.long_answer_tokens = self.load_data(file_path) + print(f"Loaded {len(self.data)} samples from {file_path}") + + def load_data(self, file_path): + data = [] + long_answer_tokens = [] + with open(file_path, 'r') as f: + for line in f: + item = json.loads(line.strip()) + chunks, long_answer = self.create_chunks(item) + data.append(chunks) + long_answer_tokens.append(long_answer) + # return data, long_answer_tokens + return torch.tensor(data), torch.tensor(long_answer_tokens) + + def create_chunks(self, item): + inputs = self.tokenizer(item['instruction'] + item['input']) + outputs = self.tokenizer(item['output']) + [self.eot_token] + + input_tokens = inputs + outputs + target_tokens = [self.ignore_tok] * len(inputs) + outputs + # if the tokens exceed the chunksize, truncate to chunksize + assert len(input_tokens) == len(target_tokens) + input_tokens = input_tokens[-self.seq_len:] + target_tokens = target_tokens[-self.seq_len:] + + # if the input is less than chunksize, auto padding + input_tokens = self.pad_input(input_tokens, self.pad_token) + target_tokens = self.pad_input(target_tokens, self.ignore_tok) + return input_tokens, target_tokens + + def pad_input(self, tokens, pad_token): + if len(tokens) < self.seq_len: + padding = [pad_token] * (self.seq_len - len(tokens)) + if self.padding_side == "right": + tokens = tokens + padding + elif self.padding_side == "left": + tokens = padding + tokens + else: + raise Exception("PADDING SIDE should either be left or right") + return tokens + + def __len__(self): + return len(self.data) + + def __getitem__(self, idx): + input_ids = self.data[idx] + target_ids = self.long_answer_tokens[idx] + if len(input_ids) != len(target_ids): + raise ValueError(f"Input and target sizes do not match at index {idx}: {input_ids.size()} vs {target_ids.size()}") + + return input_ids, target_ids + + +def get_jsonl_dataloader(args, is_train, tokenizer=None, floor=True, epoch=0, data_key="json", force_num_samples=None): + file_paths = args.train_data if is_train else args.val_data + datasets = [JSONLDataset(file_path, tokenizer, args.seq_len, args.padding_side) for file_path in file_paths] + + if is_train: + # todo if the dataset is consists of a list + dataset = datasets[0] + # Initialize shared_epoch + + if is_train: + shared_epoch = SharedEpoch(epoch=epoch) + else: + shared_epoch = None + + if is_train: + global_batch_size = args.per_gpu_batch_size * args.world_size + round_fn = math.floor if floor else math.ceil + total_num_batches = 0 + total_num_samples = 0 + # for dataset in datasets: ## dataset has already been concated + num_worker_batches = round_fn(len(dataset) / (global_batch_size * max(1, args.workers))) + num_batches = num_worker_batches * max(1, args.workers) + num_samples = num_batches * global_batch_size + total_num_batches += num_batches + total_num_samples += num_samples + else: + # For validation, just use the original dataset + dataset = datasets[0] + total_num_batches = math.ceil(len(dataset) / (args.per_gpu_val_batch_size * args.world_size)) + total_num_samples = len(dataset) + + # Create the dataloader + sampler = DistributedSampler(dataset) if args.distributed and is_train else None + shuffle = is_train and sampler is None + # shuffle = True + dataloader = DataLoader( + dataset, + batch_size=args.per_gpu_batch_size if is_train else args.per_gpu_val_batch_size, + shuffle=shuffle, + num_workers=args.workers, + pin_memory=True, + sampler=sampler, + drop_last=is_train, + ) + + dataloader.num_batches = total_num_batches + dataloader.num_samples = total_num_samples + return DataInfo(dataloader=dataloader, shared_epoch=shared_epoch, sampler=sampler) + + def get_dataset_fn(dataset_type): if dataset_type == "synthetic": return get_synthetic_dataset + elif dataset_type == "jsonl": + return get_jsonl_dataloader else: return get_wds_dataset def get_data(args, epoch=0, tokenizer=None, skip_train=False, floor=True): data = {} - if skip_train: data["train"] = None else: if args.train_data or args.dataset_type == "synthetic": # train data is treated as a shard list where all data is combined and tained on + args.train_num_samples = 1000 data["train"] = get_dataset_fn(args.dataset_type)( args, is_train=True, epoch=epoch, tokenizer=tokenizer, data_key=args.data_key, floor=floor ) @@ -711,8 +827,9 @@ def sample_chunk(chunk, args): else: raise Exception(f"Invalid sequence length: Sequence length {args.seq_len} > {chunk.shape[1]} Chunk size") - inputs = chunk[:, start_idx : start_idx + args.seq_len] - targets = chunk[:, start_idx + 1 : start_idx + args.seq_len + 1] + inputs = chunk[:, start_idx: start_idx + args.seq_len] + targets = chunk[:, start_idx + 1: start_idx + args.seq_len + 1] + # replace elements to be masked with with -100 (pytorch default xent ignore value) if args.target_mask_left is not None or args.target_mask_individual is not None: diff --git a/open_lm/datapreprocess/make_assistant_data.py b/open_lm/datapreprocess/make_assistant_data.py index 60cc52d8..46580018 100644 --- a/open_lm/datapreprocess/make_assistant_data.py +++ b/open_lm/datapreprocess/make_assistant_data.py @@ -70,10 +70,17 @@ def dump_queue_to_buffer(): with get_item_reader(file_name) as item_reader: for item in item_reader: - string = item["text"] try: - tokens = remaining_tokens + enc(string) + [eot_token] + # Extract and concatenate the relevant fields + tokens = remaining_tokens + \ + enc(item["QUESTION"]) +\ + enc(item["CONTEXTS"]) +\ + enc(item["LONG_ANSWER"]) +\ + [eot_token] + remaining_tokens = [] + # tokens = torch.tensor(tokens).unsqueeze(0) # Shape: (1, seq_len + 1) + except: print("Failed to encode string.") continue diff --git a/open_lm/evaluate.py b/open_lm/evaluate.py index 6a562724..38c1eaa3 100644 --- a/open_lm/evaluate.py +++ b/open_lm/evaluate.py @@ -55,20 +55,34 @@ def evaluate(model, data, start_epoch, args, writer): if i == dataloader.num_batches and not exhaust_loader: break - (texts,) = batch - texts = torch.LongTensor(texts).to(device) data_time_m.update(time.time() - end) with autocast(): - inputs, targets = sample_chunk(texts, args) + if args.dataset_type == "jsonl": + inputs, targets = batch + inputs = torch.LongTensor(inputs).to(device) + targets = torch.LongTensor(targets).to(device) + inputs = inputs[:, :-1] + targets = targets[:, 1:] + if is_master(args) and i == 0: + for target in targets: + print("decode", target[target!=-100]) + else: + (texts,) = batch + texts = torch.LongTensor(texts).to(device) + inputs, targets = sample_chunk(texts, args) out, _, _ = model(inputs) # [per_gpu_bs, seq_len, vocab_size] bs, seq_len = targets.shape + targets = targets.reshape(-1) total_loss = loss(out.reshape(-1, args.vocab_size), targets) # [bs * seq_len] + if is_master(args) and i == 0: + print("total_loss", total_loss) + print("loss not equal to zero", total_loss[total_loss!=0.0]) # cross entropy ignores -100 values in loss computation mask = targets != -100 diff --git a/open_lm/main.py b/open_lm/main.py index 7c80f558..a7fac49c 100644 --- a/open_lm/main.py +++ b/open_lm/main.py @@ -1,3 +1,4 @@ +import argparse import atexit import logging import os @@ -10,11 +11,12 @@ from pathlib import Path import json import traceback - import fsspec import torch from torch import optim from torch.cuda.amp import GradScaler +import os +os.environ["TOKENIZERS_PARALLELISM"] = "false" import torch.distributed as dist @@ -29,7 +31,9 @@ ) from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy -from open_lm.data import proc_token +from transformers import GPTNeoXTokenizerFast + +from open_lm.data import get_dataset_fn, proc_token from open_lm.model import Block from open_lm.losses import CrossEntropyLossWithZLoss from open_lm.utils.averaging_utils import ModelAverager @@ -132,7 +136,7 @@ def load_model(args, model, different_seed=False): model.module.load_state_dict(sd) else: model.load_state_dict(sd) - logging.info(f"=> resuming checkpoint '{args.resume}' (epoch {start_epoch})") + logging.info(f"=> resuming checkpoint '{args.resume}' (epoch {start_epoch})(step {global_step})") else: # loading a bare (model only) checkpoint for fine-tune or evaluation start_epoch, global_step = 0, 0 @@ -302,9 +306,16 @@ def cleanup(sync_process, distributed=False): if distributed and torch.distributed.is_initialized(): torch.distributed.destroy_process_group() +def tokenize_eleutherai(tokenizer, string): + return tokenizer(string).input_ids + def main(args): + # parser = argparse.ArgumentParser() + # parser.add_argument("--tokenizer", type=str, default="EleutherAI/gpt-neox-20b") + args = parse_args(args) + # args = parser.parse_args() requires_training = args.train_data or args.dataset_type == "synthetic" or args.dataset_manifest is not None @@ -468,12 +479,13 @@ def main(args): model = None if args.hf_model is not None: + print("loading huggingface model") model = create_wrapped_hf_model(args) else: # Optional: Use meta device + print("loading customized model") with torch.device("meta" if args.experimental_meta_device and args.fsdp else args.device): model = create_model(args) - args.vocab_size = model.vocab_size args.seq_len = model.seq_len if args.train_num_samples is not None: @@ -586,11 +598,15 @@ def main(args): # optionally resume model from a checkpoint start_epoch, global_step = 0, 0 shard_shuffle_seed = args.seed + if args.resume is not None: + if is_master(args): + logging.info("=> loading from a trained model.") start_epoch, global_step, shard_shuffle_seed = load_model(args, model) elif args.pretrained is not None: - print("=> loading from a pre-trained model.") + if is_master(args): + logging.info("=> loading from a pre-trained model.") args.resume = args.pretrained # this flag continues training from the pre-trained model. if args.load_pretrained_state: @@ -598,6 +614,9 @@ def main(args): else: load_model(args, model, different_seed=True) args.resume = None + + if is_master(args): + print("global_step", global_step) elif args.average is not None: num_models_to_average = len(args.average) print( @@ -657,11 +676,21 @@ def main(args): # initialize datasets # use tokenizer=None because the data is already pre-tokenized. + + tokenizer_type = getattr(args, "tokenizer", "EleutherAI/gpt-neox-20b") + enc = GPTNeoXTokenizerFast.from_pretrained(f"{tokenizer_type}") + if is_master(args): + logging.info(f"current encoder name is:{tokenizer_type}") + logging.info(f"padding side is :{enc.padding_side}") + args.padding_side = enc.padding_side + tokenizer = lambda x: tokenize_eleutherai(enc, x) + data = get_data( args, epoch=start_epoch, - tokenizer=None, + # tokenizer=None, + tokenizer=tokenizer, skip_train=args.dataset_manifest is not None, floor=args.dataset_manifest is not None, ) @@ -693,8 +722,10 @@ def main(args): if args.dataset_manifest is not None: total_steps = (args.train_num_samples * args.epochs) // args.global_batch_size else: - total_steps = (data["train"].dataloader.num_batches) * args.epochs - + # would enter this branch + total_steps = data["train"].dataloader.num_batches * args.epochs + if is_master(args): + logging.info(f"total steps required by training is {total_steps}") if args.lr_scheduler == "cosine": scheduler = cosine_lr( optimizer, @@ -774,6 +805,22 @@ def main(args): done_training = global_step >= total_steps epoch = start_epoch num_ckpt_too_few_tokens = 0 + # if "val_list" in data and (epoch % args.val_frequency == 0 or done_training): + # # validate based on frequency and always validate the last checkpoint + # if is_master(args): + # logging.info("Process the eval step") + # try: + # evaluation_metrics = evaluate_loop(model, data["val_list"], epoch, args, writer) + # + # if is_master(args): + # with fsspec.open(os.path.join(args.checkpoint_path, "results.jsonl"), "a") as f: + # f.write(f"{json.dumps(evaluation_metrics)}\n") + # + # except Exception as e: + # if is_master(args): + # logging.error(e) + # logging.error(traceback.format_exc()) + # logging.warning("evaluation failed! continuing to save_checkpoint") while not done_training: if is_master(args): logging.info(f"Start epoch {epoch}") @@ -808,9 +855,12 @@ def main(args): args.train_data = train_data_string_per_source # Draw num_samples_per_source at most from dataset - rounded down to guarantee uniqueness. - data["train"] = get_wds_dataset( - args, True, epoch, force_num_samples=num_samples_per_source, data_key=args.data_key, floor=True + data["train"] = get_dataset_fn(args.dataset_type)( + args, is_train=True, epoch=epoch, tokenizer=tokenizer, data_key=args.data_key, floor=True ) + # data["train"] = get_wds_dataset( + # args, True, epoch, force_num_samples=num_samples_per_source, data_key=args.data_key, floor=True + # ) prev_step = global_step if is_master(args): @@ -859,6 +909,8 @@ def main(args): evaluation_metrics = [] if "val_list" in data and (epoch % args.val_frequency == 0 or done_training): # validate based on frequency and always validate the last checkpoint + if is_master(args): + logging.info("Process the eval step") try: evaluation_metrics = evaluate_loop(model, data["val_list"], epoch, args, writer) diff --git a/open_lm/model.py b/open_lm/model.py index 0c979c40..6219f7ff 100644 --- a/open_lm/model.py +++ b/open_lm/model.py @@ -427,7 +427,7 @@ def create_params(args): if args.model.endswith(".json"): _rescan_model_configs(model_config_paths=args.model) args.model = Path(args.model).stem - + # print(f"_MODEL_CONFIGS{_MODEL_CONFIGS}") if args.model in _MODEL_CONFIGS: cfg = deepcopy(_MODEL_CONFIGS[args.model]) else: diff --git a/open_lm/params.py b/open_lm/params.py index 0a7a3f64..0c027230 100644 --- a/open_lm/params.py +++ b/open_lm/params.py @@ -320,7 +320,7 @@ def parse_args(args): ) parser.add_argument( "--dataset-type", - choices=["webdataset", "auto", "synthetic"], + choices=["webdataset", "auto", "synthetic","jsonl"], default="auto", help="Which type of dataset to process.", ) @@ -704,7 +704,7 @@ def parse_args(args): parser.add_argument( "--distill-model", default=None, - help="Which model arch to distill from, if any.", + help="Which model arch to distill from, if any.", ) parser.add_argument( "--distill-pretrained", diff --git a/open_lm/train.py b/open_lm/train.py index 0d54bf70..cdd82ebc 100644 --- a/open_lm/train.py +++ b/open_lm/train.py @@ -62,8 +62,8 @@ def train_one_epoch( data["train"].set_epoch(epoch) # set epoch in process safe manner via sampler or shared_epoch dataloader = data["train"].dataloader num_batches_per_epoch = dataloader.num_batches - sample_digits = math.ceil(math.log(dataloader.num_samples + 1, 10)) + sample_digits = math.ceil(math.log(dataloader.num_samples + 1, 10)) losses_m = AverageMeter() load_balancing_losses_m = AverageMeter() batch_time_m = AverageMeter() @@ -116,31 +116,54 @@ def train_one_epoch( if args.world_size > 1: dist.all_reduce(has_data, op=ReduceOp.SUM) + # if is_master(args): + # print("current has data", has_data) if has_data < args.world_size: break - (texts,) = batch - texts = torch.LongTensor(texts).to(device) + # (texts,) = batch + + # texts = torch.LongTensor(texts).to(device) data_time_m.update(time.time() - end) optimizer.zero_grad() - if args.accum_freq == 1: with autocast(): forward_start = time.time() - inputs, targets = sample_chunk(texts, args) + if args.dataset_type == "jsonl": + inputs, targets = batch + inputs = torch.LongTensor(inputs).to(device) + targets = torch.LongTensor(targets).to(device) + + inputs = inputs[:, :-1] + targets = targets[:, 1:] + assert inputs.size() == targets.size() + if is_master(args): + if i == 0: + logging.info("enter customed jsonl step") + logging.info("inputs id of first forward on") + logging.info("current inputs") + logging.info(inputs[:3, :500]) + logging.info("current targets") + logging.info(targets[:3, :500]) + else: + (texts,) = batch + if is_master(args): + pass + texts = torch.LongTensor(texts).to(device) + inputs, targets = sample_chunk(texts, args) out, _, _ = model(inputs) + if is_master(args) and i == 0: + pass forward_time_m.update(time.time() - forward_start) if args.log_logit_mean: logit_m.update(torch.mean(out).item()) - total_lm_loss = loss(out.reshape(-1, args.vocab_size), targets.reshape(-1)) total_loss = total_lm_loss if args.moe_freq > 0: total_load_balancing_loss = batched_load_balancing_loss(moe_args) clear_load_balancing_loss() total_loss += total_load_balancing_loss - backward_start = time.time() backward(total_loss, scaler) backward_time_m.update(time.time() - backward_start) @@ -158,8 +181,9 @@ def train_one_epoch( assert args.per_gpu_batch_size % args.accum_freq == 0, "Per-GPU batch size must be divisible by accum_freq" per_batch = args.per_gpu_batch_size // args.accum_freq - inputs, targets = sample_chunk(texts, args) - + # inputs, targets = sample_chunk(texts, args) + inputs, targets = batch + forward_total_time = 0 backward_total_time = 0 for ii in range(args.accum_freq): @@ -280,8 +304,6 @@ def train_one_epoch( step += 1 if is_master(args): batch_size = len(inputs) - # update the loss meter with the global loss tensor every iteration, so that the logging is of the avg of loss of the last - # args.log_every_n_steps iterations if args.moe_freq > 0: losses_m.update(global_loss_tensor.item() - total_load_balancing_loss.item(), batch_size) load_balancing_losses_m.update(total_load_balancing_loss.item(), batch_size) diff --git a/open_lm/utils/llm_foundry_wrapper.py b/open_lm/utils/llm_foundry_wrapper.py index f4f14e79..166d42a2 100644 --- a/open_lm/utils/llm_foundry_wrapper.py +++ b/open_lm/utils/llm_foundry_wrapper.py @@ -4,14 +4,14 @@ """Implements a Hugging Causal LM wrapped inside a :class:`.ComposerModel`.""" from typing import Mapping, Union -from llmfoundry.eval.metrics.nlp import ( + +from composer.metrics.nlp import ( InContextLearningLMAccuracy, InContextLearningLMExpectedCalibrationError, InContextLearningMCExpectedCalibrationError, InContextLearningMultipleChoiceAccuracy, - InContextLearningGenerationExactMatchAccuracy, -) -from composer.metrics.nlp import ( + InContextLearningQAAccuracy, + InContextLearningCodeEvalAccuracy, LanguageCrossEntropy, LanguagePerplexity, ) @@ -33,9 +33,10 @@ LanguagePerplexity(), InContextLearningLMAccuracy(), InContextLearningMultipleChoiceAccuracy(), - InContextLearningGenerationExactMatchAccuracy(), + InContextLearningQAAccuracy(), InContextLearningLMExpectedCalibrationError(), InContextLearningMCExpectedCalibrationError(), + InContextLearningCodeEvalAccuracy(), ] diff --git a/scripts/generate.py b/scripts/generate.py index 27edfd60..7603e556 100644 --- a/scripts/generate.py +++ b/scripts/generate.py @@ -3,9 +3,11 @@ import argparse import json import builtins as __builtin__ - import torch +import sys, os +current_working_directory = os.getcwd() +sys.path.append(f"{current_working_directory}") from composer.utils import dist, get_device from open_lm.utils.transformers.hf_model import OpenLMforCausalLM from open_lm.utils.transformers.hf_config import OpenLMConfig @@ -14,6 +16,9 @@ from open_lm.params import add_model_args from transformers import GPTNeoXTokenizerFast, LlamaTokenizerFast +import os + + builtin_print = __builtin__.print @@ -21,7 +26,8 @@ @torch.inference_mode() def run_model(open_lm: OpenLMforCausalLM, tokenizer, args): dist.initialize_dist(get_device(None), timeout=600) - input = tokenizer(args.input_text) + input_text_loads = json.loads(args.input_text) + input = tokenizer(input_text_loads['instruction'] + input_text_loads['input']) input = {k: torch.tensor(v).unsqueeze(0).cuda() for k, v in input.items()} composer_model = SimpleComposerOpenLMCausalLM(open_lm, tokenizer) composer_model = composer_model.cuda() @@ -32,17 +38,20 @@ def run_model(open_lm: OpenLMforCausalLM, tokenizer, args): "max_new_tokens": args.max_gen_len, "use_cache": args.use_cache, "num_beams": args.num_beams, + } # If these are set when temperature is 0, they will trigger a warning and be ignored if args.temperature > 0: generate_args["temperature"] = args.temperature generate_args["top_p"] = args.top_p - + print("input_ids:", input["input_ids"]) output = composer_model.generate( input["input_ids"], **generate_args, + eos_token_id=[0], ) - output = tokenizer.decode(output[0].cpu().numpy()) + print(f'current output:{output[0][len(input["input_ids"][0]): -1].cpu().numpy()}') + output = tokenizer.decode(output[0][len(input["input_ids"][0]): -1].cpu().numpy()) print("-" * 50) print("\t\t Model output:") print("-" * 50) @@ -56,7 +65,7 @@ def main(): parser.add_argument("--input-text", required=True) parser.add_argument("--max-gen-len", default=200, type=int) - parser.add_argument("--temperature", default=0.8, type=float) + parser.add_argument("--temperature", default=0.7, type=float) parser.add_argument("--top-p", default=0.95, type=float) parser.add_argument("--use-cache", default=False, action="store_true") parser.add_argument("--tokenizer", default="EleutherAI/gpt-neox-20b", type=str)