Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-gpu inference option #125

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ python scripts/run_rm.py --model=PKU-Alignment/beaver-7b-v1.0-cost --chat_templa
python scripts/run_rm.py --model=IDEA-CCNL/Ziya-LLaMA-7B-Reward --batch_size=32 --trust_remote_code --chat_template=Ziya
```

To lauch a multi-gpu inference job with Accelerate, run:
```
accelerate launch scripts/run_rm_mgpu.py --model=OpenAssistant/oasst-rm-2.1-pythia-1.4b-epoch-2.5 --chat_template=oasst_pythia
```
Optionally with `--num_processes=2` if Acceralte does not automatically detect it.


To run these models with AI2 infrastructure, run:
```
python scripts/submit_eval_jobs.py
Expand Down
13 changes: 2 additions & 11 deletions scripts/run_rm.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import numpy as np
import torch
import transformers
from accelerate import Accelerator
from accelerate.logging import get_logger
from fastchat.conversation import get_conv_template
from tqdm import tqdm
Expand Down Expand Up @@ -76,9 +75,6 @@ def main():
###############
# Setup logging
###############
accelerator = Accelerator()
current_device = accelerator.process_index

logger = get_logger(__name__)
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
Expand Down Expand Up @@ -163,11 +159,11 @@ def main():
if quantized:
model_kwargs = {
"load_in_8bit": True,
"device_map": {"": current_device},
"device_map": "auto",
"torch_dtype": torch.float16 if torch.cuda.is_available() else None,
}
else:
model_kwargs = {"device_map": {"": current_device}}
model_kwargs = {"device_map": "auto"}

model = model_builder(args.model, **model_kwargs, trust_remote_code=trust_remote_code)
reward_pipe = pipeline_builder(
Expand Down Expand Up @@ -199,8 +195,6 @@ def main():
if pipeline_builder == pipeline:
logger.info("*** Running forward pass via built in pipeline abstraction ***")
# this setup can be optimized slightly with one pipeline call
# prepare for inference
reward_pipe = accelerator.prepare(reward_pipe)

results_rej = reward_pipe(dataset["text_rejected"], **reward_pipeline_kwargs)
results_cho = reward_pipe(dataset["text_chosen"], **reward_pipeline_kwargs)
Expand Down Expand Up @@ -237,9 +231,6 @@ def custom_collate_fn(batch):
drop_last=False,
)

dataloader, model = accelerator.prepare(dataloader, reward_pipe.model)
reward_pipe.model = model

results = []
scores_chosen = []
scores_rejected = []
Expand Down
330 changes: 330 additions & 0 deletions scripts/run_rm_mgpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
# Copyright 2023 AllenAI. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import logging
import os
import sys

import numpy as np
import torch
import transformers
from accelerate import Accelerator
from accelerate.logging import get_logger
from fastchat.conversation import get_conv_template
from torch.distributed import all_gather
from torch.utils.data.dataloader import default_collate
from tqdm import tqdm
from transformers import AutoTokenizer

from rewardbench import (
REWARD_MODEL_CONFIG,
check_tokenizer_chat_template,
load_eval_dataset,
save_to_hub,
)
from rewardbench.constants import EXAMPLE_COUNTS, SUBSET_MAPPING
from rewardbench.utils import calculate_scores_per_section

# get token from HF_TOKEN env variable, but if it doesn't exist pass none
HF_TOKEN = os.getenv("HF_TOKEN", None)
# this is necessary to automatically log in when running this script in docker/batch beaker jobs
if HF_TOKEN is not None:
from huggingface_hub._login import _login

_login(token=HF_TOKEN, add_to_git_credential=False)


def get_args():
"""
Parse arguments strings model and chat_template
"""
parser = argparse.ArgumentParser()
parser.add_argument("--model", type=str, required=True, help="path to model")
parser.add_argument("--tokenizer", type=str, default=None, help="path to non-matching tokenizer to model")
parser.add_argument("--chat_template", type=str, default="tulu", help="path to chat template")
parser.add_argument(
"--trust_remote_code", action="store_true", default=False, help="directly load model instead of pipeline"
)
parser.add_argument("--do_not_save", action="store_true", help="do not save results to hub (for debugging)")
parser.add_argument("--batch_size", type=int, default=64, help="batch size for inference")
parser.add_argument("--max_length", type=int, default=2048, help="Max length of RM inputs (passed to pipeline)")
parser.add_argument(
"--pref_sets", action="store_true", help="run on common preference sets instead of our custom eval set"
)
parser.add_argument(
"--debug", action="store_true", help="run on common preference sets instead of our custom eval set"
)
parser.add_argument(
"--disable_beaker_save", action="store_true", help="disable saving the main results in a file for AI2 Beaker"
)
args = parser.parse_args()
return args


def main():
args = get_args()
###############
# Setup logging
###############
accelerator = Accelerator()
current_device = accelerator.process_index

logger = get_logger(__name__)
logging.basicConfig(
format="%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
handlers=[logging.StreamHandler(sys.stdout)],
)
log_level = logging.INFO
logger.setLevel(log_level)
transformers.utils.logging.set_verbosity(log_level)
transformers.utils.logging.enable_default_handler()
transformers.utils.logging.enable_explicit_format()

logger.info(f"Running reward model on {args.model} with chat template {args.chat_template}")
if args.trust_remote_code:
logger.info("Loading model with Trust Remote Code")

# load chat template
chat_template = args.chat_template
conv = get_conv_template(chat_template)

if args.model in REWARD_MODEL_CONFIG:
config = REWARD_MODEL_CONFIG[args.model]
else:
config = REWARD_MODEL_CONFIG["default"]
logger.info(f"Using reward model config: {config}")

# Default entries
# "model_builder": AutoModelForSequenceClassification.from_pretrained,
# "pipeline_builder": pipeline,
# "quantized": True,
# "custom_dialogue": False,
# "model_type": "Seq. Classifier"

quantized = config["quantized"] # only Starling isn't quantized for now
custom_dialogue = config["custom_dialogue"]
model_type = config["model_type"]
model_builder = config["model_builder"]
pipeline_builder = config["pipeline_builder"]

# not included in config to make user explicitly understand they are passing this
trust_remote_code = args.trust_remote_code

############################
# Load dataset
############################
logger.info("*** Load dataset ***")
tokenizer_path = args.tokenizer if args.tokenizer else args.model
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path, trust_remote_code=args.trust_remote_code)
if not custom_dialogue: # not needed for PairRM / SteamSHP
tokenizer.truncation_side = "left" # copied from Starling, but few samples are above context length
dataset, subsets = load_eval_dataset(
core_set=not args.pref_sets,
conv=conv,
custom_dialogue_formatting=custom_dialogue,
tokenizer=tokenizer,
logger=logger,
keep_columns=["text_chosen", "text_rejected", "id"],
)
# copy id for saving, then remove
ids = dataset["id"]
dataset = dataset.remove_columns("id")

# debug: use only 10 examples
if args.debug:
dataset = dataset.select(range(10))
subsets = subsets[:10]
ids = ids[:10]

############################
# Load reward model pipeline
############################
BATCH_SIZE = args.batch_size
logger.info("*** Load reward model ***")
reward_pipeline_kwargs = {
"batch_size": BATCH_SIZE, # eval_args.inference_batch_size,
"truncation": True,
"padding": True,
"max_length": args.max_length,
"function_to_apply": "none", # Compute raw logits
"return_token_type_ids": False,
}
if quantized:
model_kwargs = {
"load_in_8bit": True,
"device_map": {"": current_device},
"torch_dtype": torch.float16 if torch.cuda.is_available() else None,
}
else:
model_kwargs = {"device_map": {"": current_device}}

model = model_builder(args.model, **model_kwargs, trust_remote_code=trust_remote_code)
reward_pipe = pipeline_builder(
"text-classification",
model=model,
tokenizer=tokenizer,
)

############################
# Tokenization settings & dataset preparation
############################
# set pad token to eos token if not set
if reward_pipe.tokenizer.pad_token_id is None:
reward_pipe.model.config.pad_token_id = reward_pipe.tokenizer.eos_token_id
reward_pipe.tokenizer.pad_token_id = reward_pipe.tokenizer.eos_token_id
# For models whose config did not contains `pad_token_id`
if reward_pipe.model.config.pad_token_id is None:
reward_pipe.model.config.pad_token_id = reward_pipe.tokenizer.pad_token_id

# if using fastchat template (no template in tokenizer), make the RM tokenizer output an EOS token
if not check_tokenizer_chat_template(tokenizer):
reward_pipe.tokenizer.add_eos_token = True

############################
# Run inference
############################
# TODO make more custom pipelines work with pre-tokenized data
# for PairRM, hmm, will move all of this later
def custom_collate_fn(batch):
# check if ['text_chosen'] is in first batch element
# Check if the first element of the batch is a dictionary
if isinstance(batch[0]["text_chosen"][0], dict):
return batch # Return the batch as-is if it's a list of dicts
else:
return default_collate(batch) # Use the default collate behavior otherwise

accelerator.wait_for_everyone()
with accelerator.split_between_processes(dataset) as dataset_sub:

# split_between_processes will not work with DataLoader
dataloader = torch.utils.data.DataLoader(
dataset_sub,
batch_size=BATCH_SIZE,
collate_fn=custom_collate_fn, # if not args.pref_sets else None,
shuffle=False,
drop_last=False,
)

dataloader, model = accelerator.prepare(dataloader, reward_pipe.model)
reward_pipe.model = model.module

results = []
scores_chosen = []
scores_rejected = []
for step, batch in enumerate(tqdm(dataloader, desc="RM batch steps")):
logger.info(f"RM inference step {step}/{len(dataloader)}")

if model_type == "Custom Classifier":
text_rejected = [b["text_rejected"] for b in batch]
text_chosen = [b["text_chosen"] for b in batch]
results_sub = reward_pipe(text_chosen, text_rejected, **reward_pipeline_kwargs)
[results.append(1) if result else results.append(0) for result in results_sub.cpu().numpy().tolist()]
scores_chosen.extend([None] * len(results_sub))
scores_rejected.extend([None] * len(results_sub))
else:
rewards_chosen = reward_pipe(batch["text_chosen"], **reward_pipeline_kwargs)
rewards_rejected = reward_pipe(batch["text_rejected"], **reward_pipeline_kwargs)

# for each item in batch, record 1 if chosen > rejected
# extra score from dict within batched results (e.g. logits)
# [{'label': 'LABEL_1', 'score': 0.6826171875},... ]
if isinstance(rewards_chosen[0], dict):
score_chosen_batch = [result["score"] for result in rewards_chosen]
score_rejected_batch = [result["score"] for result in rewards_rejected]
# for classes that directly output scores (custom code)
else:
score_chosen_batch = rewards_chosen.cpu().numpy().tolist()
score_rejected_batch = rewards_rejected.cpu().numpy().tolist()

# log results
[
results.append(1) if chosen > rejected else results.append(0)
for chosen, rejected in zip(score_chosen_batch, score_rejected_batch)
]
scores_chosen.extend(score_chosen_batch)
scores_rejected.extend(score_rejected_batch)

results_gathered = all_gather(results)
if accelerator.is_main_process:
############################
# Print & process results
############################
# add column for results for easy printing
out_dataset = dataset.add_column("results", results_gathered)

# add subsets back (removed so it's not handled by cuda)
out_dataset = out_dataset.add_column("subset", subsets)
out_dataset = out_dataset.add_column("id", ids)

# add scores_chosen and scores_rejected to the dataset
out_dataset = out_dataset.add_column("scores_chosen", scores_chosen)
out_dataset = out_dataset.add_column("scores_rejected", scores_rejected)

# get core dataset
results_grouped = {}
results_grouped["model"] = args.model
results_grouped["model_type"] = model_type
results_grouped["chat_template"] = (
args.chat_template if not check_tokenizer_chat_template(tokenizer) else "tokenizer"
)

# print per subset and log into results_grouped file
present_subsets = np.unique(subsets)
for subset in present_subsets:
subset_dataset = out_dataset.filter(lambda example: example["subset"] == subset)
num_correct = sum(subset_dataset["results"])
num_total = len(subset_dataset["results"])
print(f"{subset}: {num_correct}/{num_total} ({num_correct/num_total})")
results_grouped[subset] = num_correct / num_total

# log leaderboard aggregated results
if not args.pref_sets:
results_leaderboard = calculate_scores_per_section(EXAMPLE_COUNTS, SUBSET_MAPPING, results_grouped)
print(results_leaderboard)

############################
# Upload results to hub
############################
sub_path = "eval-set/" if not args.pref_sets else "pref-sets/"
results_url = save_to_hub(
results_grouped,
args.model,
sub_path,
args.debug,
local_only=args.do_not_save,
save_metrics_for_beaker=not args.disable_beaker_save,
)
if not args.do_not_save:
logger.info(f"Uploaded reward model results to {results_url}")

# upload chosen-rejected with scores
if not model_type == "Custom Classifier": # custom classifiers do not return scores
# create new json with scores and upload
scores_dict = out_dataset.to_dict()
scores_dict["model"] = args.model
scores_dict["model_type"] = model_type
scores_dict["chat_template"] = args.chat_template

sub_path_scores = "eval-set-scores/" if not args.pref_sets else "pref-sets-scores/"

scores_url = save_to_hub(scores_dict, args.model, sub_path_scores, args.debug, local_only=args.do_not_save)
logger.info(f"Uploading chosen-rejected text with scores to {scores_url}")
else:
logger.info("Not uploading chosen-rejected text with scores due to model compatibility")


if __name__ == "__main__":
main()
Loading
Loading