Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DeepFM 1n1d example #360

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions RecommenderSystems/deepfm/deepfm_train_eval.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ def str_list(x):

parser.add_argument("--data_dir", type=str, required=True)
parser.add_argument(
"--num_train_samples", type=int, required=True, help="the number of train samples"
"--num_train_samples", type=int, required=True, help="the number of train samples",
)
parser.add_argument(
"--num_val_samples", type=int, required=True, help="the number of validation samples"
"--num_val_samples", type=int, required=True, help="the number of validation samples",
)
parser.add_argument(
"--num_test_samples", type=int, required=True, help="the number of test samples"
Expand All @@ -36,7 +36,7 @@ def str_list(x):
parser.add_argument("--model_load_dir", type=str, default=None, help="model loading directory")
parser.add_argument("--model_save_dir", type=str, default=None, help="model saving directory")
parser.add_argument(
"--save_initial_model", action="store_true", help="save initial model parameters or not"
"--save_initial_model", action="store_true", help="save initial model parameters or not",
)
parser.add_argument(
"--save_model_after_each_eval",
Expand All @@ -46,7 +46,7 @@ def str_list(x):

parser.add_argument("--embedding_vec_size", type=int, default=16, help="embedding vector size")
parser.add_argument(
"--dnn", type=int_list, default="1000,1000,1000,1000,1000", help="dnn hidden units number"
"--dnn", type=int_list, default="1000,1000,1000,1000,1000", help="dnn hidden units number",
)
parser.add_argument("--net_dropout", type=float, default=0.2, help="net dropout rate")
parser.add_argument("--disable_fusedmlp", action="store_true", help="disable fused MLP or not")
Expand All @@ -59,7 +59,7 @@ def str_list(x):
"--batch_size", type=int, default=10000, help="training/evaluation batch size"
)
parser.add_argument(
"--train_batches", type=int, default=75000, help="the maximum number of training batches"
"--train_batches", type=int, default=75000, help="the maximum number of training batches",
)
parser.add_argument("--loss_print_interval", type=int, default=100, help="")

Expand All @@ -83,7 +83,7 @@ def str_list(x):
required=True,
)
parser.add_argument(
"--persistent_path", type=str, required=True, help="path for persistent kv store"
"--persistent_path", type=str, required=True, help="path for persistent kv store",
)
parser.add_argument(
"--store_type",
Expand All @@ -99,15 +99,22 @@ def str_list(x):
)

parser.add_argument(
"--amp", action="store_true", help="enable Automatic Mixed Precision(AMP) training or not"
"--amp", action="store_true", help="enable Automatic Mixed Precision(AMP) training or not",
)
parser.add_argument("--loss_scale_policy", type=str, default="static", help="static or dynamic")

parser.add_argument(
"--disable_early_stop", action="store_true", help="enable early stop or not"
)
parser.add_argument("--save_best_model", action="store_true", help="save best model or not")

parser.add_argument(
"--save_graph_for_serving",
action="store_true",
help="Save Graph and OneEmbedding for serving. ",
)
parser.add_argument(
"--model_serving_path", type=str, required=True, help="Graph object path for model serving",
)
args = parser.parse_args()

if print_args and flow.env.get_rank() == 0:
Expand Down Expand Up @@ -541,7 +548,7 @@ def save_model(subdir):
stop_training = False

cached_eval_batches = prefetch_eval_batches(
f"{args.data_dir}/val", args.batch_size, math.ceil(args.num_val_samples / args.batch_size)
f"{args.data_dir}/val", args.batch_size, math.ceil(args.num_val_samples / args.batch_size),
)

deepfm_module.train()
Expand Down Expand Up @@ -604,10 +611,17 @@ def save_model(subdir):
print("================ Test Evaluation ================")
eval(args, eval_graph, tag="test", cur_step=step, epoch=epoch)

if args.save_graph_for_serving:
del eval_graph
recompiled_eval_graph = compile_eval_graph(args, deepfm_module, tag="test")
eval_state_dict = recompiled_eval_graph.state_dict()
flow.save(recompiled_eval_graph, args.model_serving_path)
flow.save_one_embedding_info(eval_state_dict, args.model_serving_path)


def np_to_global(np):
t = flow.from_numpy(np)
return t.to_global(placement=flow.env.all_device_placement("cpu"), sbp=flow.sbp.split(0))
return t.to_global(placement=flow.env.all_device_placement("cpu"), sbp=flow.sbp.broadcast)
MARD1NO marked this conversation as resolved.
Show resolved Hide resolved


def batch_to_global(np_label, np_features, is_train=True):
Expand Down Expand Up @@ -687,6 +701,17 @@ def eval(args, eval_graph, tag="val", cur_step=0, epoch=0, cached_eval_batches=N
return auc, logloss


def compile_eval_graph(args, deepfm_module, tag="val"):
eval_graph = DeepFMValGraph(deepfm_module, args.amp)
eval_graph.module.eval()
with make_criteo_dataloader(f"{args.data_dir}/{tag}", args.batch_size, shuffle=False) as loader:
label, features = batch_to_global(*next(loader), is_train=False)
# Cause we want to infer to GPU, so here set is_train as True to place input Tensor in CUDA Device
features = features.to("cuda")
pred = eval_graph(features)
return eval_graph


if __name__ == "__main__":
os.system(sys.executable + " -m oneflow --doctor")
flow.boxing.nccl.enable_all_to_all(True)
Expand Down
25 changes: 25 additions & 0 deletions RecommenderSystems/deepfm/train_deepfm_1d.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
DATA_DIR=/path/to/deepfm_parquet
PERSISTENT_PATH=/path/to/persistent
MODEL_SAVE_DIR=/path/to/model/save/dir
MODEL_SERVING_PATH=/path/to/model_serving/save/dir

python3 deepfm_train_eval.py \
--data_dir $DATA_DIR \
--persistent_path $PERSISTENT_PATH \
--table_size_array "649,9364,14746,490,476707,11618,4142,1373,7275,13,169,407,1376,1460,583,10131227,2202608,305,24,12517,633,3,93145,5683,8351593,3194,27,14992,5461306,10,5652,2173,4,7046547,18,15,286181,105,142572" \
--store_type 'cached_host_mem' \
--cache_memory_budget_mb 1024 \
--batch_size 10000 \
--train_batches 75000 \
--loss_print_interval 100 \
--dnn "1000,1000,1000,1000,1000" \
--net_dropout 0.2 \
--learning_rate 0.001 \
--embedding_vec_size 16 \
--num_train_samples 36672493 \
--num_val_samples 4584062 \
--num_test_samples 4584062 \
--model_save_dir $MODEL_SAVE_DIR \
--save_best_model \
--save_graph_for_serving \
--model_serving_path $MODEL_SERVING_PATH