From cf4994a4b4846c8dece017e1ea3c844b766129d1 Mon Sep 17 00:00:00 2001 From: Nativeatom Date: Tue, 17 Jun 2025 23:19:35 +0800 Subject: [PATCH] temporal reward filtering --- .../rlvr_config.yaml | 261 ++++++++++++++++++ .../run_rlvr_pipeline.sh | 5 + roll/pipeline/rlvr/rlvr_config.py | 5 + roll/pipeline/rlvr/rlvr_pipeline.py | 103 +++++++ 4 files changed, 374 insertions(+) create mode 100644 examples/qwen2.5-0.5B-rlvr_megatron_temp_reward_filtering/rlvr_config.yaml create mode 100755 examples/qwen2.5-0.5B-rlvr_megatron_temp_reward_filtering/run_rlvr_pipeline.sh diff --git a/examples/qwen2.5-0.5B-rlvr_megatron_temp_reward_filtering/rlvr_config.yaml b/examples/qwen2.5-0.5B-rlvr_megatron_temp_reward_filtering/rlvr_config.yaml new file mode 100644 index 00000000..2abfa9a6 --- /dev/null +++ b/examples/qwen2.5-0.5B-rlvr_megatron_temp_reward_filtering/rlvr_config.yaml @@ -0,0 +1,261 @@ +hydra: + run: + dir: . + output_subdir: null + +exp_name: "qwen2.5-0.5B-rlvr-config-temp-reward" +seed: 42 +logging_dir: ./output/logs +output_dir: ./output +system_envs: + USE_MODELSCOPE: '1' + +checkpoint_config: + type: file_system + output_dir: /data/cpfs_0/rl_examples/models/${exp_name} + +#track_with: wandb +#tracker_kwargs: +# api_key: +# project: roll_examples +# notes: roll_examples +# tags: +# - rlvr +# - baseline + +track_with: tensorboard +tracker_kwargs: + log_dir: /data/oss_bucket_0/rl_examples/llm/tensorboard/roll_exp/rlvr + +num_gpus_per_node: 8 + +max_steps: 500 +save_steps: 100 +logging_steps: 1 +eval_steps: 10 +resume_from_checkpoint: false + + +rollout_batch_size: 64 # prompt +prompt_length: 2048 +response_length: 4096 + +num_return_sequences_in_group: 8 +ppo_epochs: 1 +adv_estimator: "reinforce" + +# clip +value_clip: 0.5 +reward_clip: 10 +advantage_clip: 2.0 +dual_clip_loss: true + +# normalize +reward_norm: null +reward_shift: false +reward_scale: false + +# data mask +max_len_mask: true +difficulty_mask: true +difficulty_low_threshold: 0.1 +difficulty_high_threshold: 0.95 +error_max_len_clip: false + +# reward-based prompt filtering +use_filtering_metric: true +filtering_metric: "mean_temporal_reward" +num_recent_reward: 2 +filtering_warmup_epoch: 1 +filtering_max_epoch: 10 + +# data weight +difficulty_loss_weight: false +length_loss_weight: false + +# reward +add_token_level_kl: false + +# advantage +whiten_advantages: true + +# dynamic sampling scheduler +# use_additional_prompts: true +# max_running_requests: 256 +# is_num_return_sequences_expand: false + +pretrain: Qwen/Qwen2.5-7B +reward_pretrain: Qwen/Qwen2.5-7B + +validation: + data_args: + template: qwen2_5 + file_name: + - data/aime24_25_deal.jsonl + generating_args: + top_p: 0.6 + top_k: 50 + num_beams: 1 + temperature: 0.6 + num_return_sequences: 1 + eval_steps: 10 + +actor_train: + model_args: + flash_attn: fa2 + disable_gradient_checkpointing: false + dtype: bf16 + model_type: ~ + training_args: + learning_rate: 1.0e-6 + weight_decay: 0 + per_device_train_batch_size: 1 + gradient_accumulation_steps: 32 + warmup_steps: 20 + num_train_epochs: 50 + data_args: + template: qwen2_5 + file_name: + - data/code_KodCode_data.jsonl + - data/llm_judge_Multi-subject-RLVR_deal_new.jsonl + - data/math_deepmath_deal.jsonl + - data/general_ifeval_train_deal.jsonl + - data/general_CrossThink-QA_deal.jsonl + domain_interleave_probs: + math_rule: 0.4 + code_sandbox: 0.3 + llm_judge: 0.1 + crossthinkqa: 0.1 + ifeval: 0.1 + dataset_dir: data + messages: messages + interleave_probs: "1.0" + preprocessing_num_workers: 16 + strategy_args: + strategy_name: megatron_train + strategy_config: + tensor_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + expert_model_parallel_size: 1 + use_distributed_optimizer: true + recompute_granularity: full + device_mapping: list(range(0,16)) + infer_batch_size: 4 + +actor_infer: + model_args: + flash_attn: fa2 + disable_gradient_checkpointing: true + dtype: bf16 + generating_args: + max_new_tokens: ${response_length} + top_p: 0.99 + top_k: 100 + num_beams: 1 + temperature: 0.99 + num_return_sequences: ${num_return_sequences_in_group} + data_args: + template: qwen2_5 + strategy_args: + strategy_name: vllm + strategy_config: + gpu_memory_utilization: 0.6 + block_size: 16 + max_model_len: 8000 + device_mapping: list(range(0,12)) + infer_batch_size: 1 + +reference: + model_args: + flash_attn: fa2 + disable_gradient_checkpointing: true + dtype: bf16 + model_type: ~ + data_args: + template: qwen2_5 + strategy_args: + strategy_name: megatron_infer + strategy_config: + tensor_model_parallel_size: 1 + pipeline_model_parallel_size: 1 + expert_model_parallel_size: 1 + device_mapping: list(range(0,16)) + infer_batch_size: 8 + +rewards: + crossthinkqa: + worker_cls: roll.pipeline.rlvr.rewards.crossthinkqa_rule_reward_worker.CrossThinkQARuleRewardWorker + reward_type: soft + response_length_penalty_coef: 0.0 + model_args: + model_name_or_path: ${reward_pretrain} + data_args: + template: qwen2_5 + tag_included: [crossthinkqa] + world_size: 8 + infer_batch_size: 4 + ifeval: + worker_cls: roll.pipeline.rlvr.rewards.ifeval_rule_reward_worker.GeneralRuleRewardWorker + reward_type: soft + model_args: + model_name_or_path: ${reward_pretrain} + data_args: + template: qwen2_5 + tag_included: [ifeval] + world_size: 8 + infer_batch_size: 4 + math_rule: + worker_cls: roll.pipeline.rlvr.rewards.math_rule_reward_worker.MathRuleRewardWorker + model_args: + model_name_or_path: ${reward_pretrain} + data_args: + template: qwen2_5 + tag_included: [deepmath_103k, aime] + world_size: 8 + infer_batch_size: 1 +# dynamic filter config +# query_filter_config: +# type: mean_filter +# filter_args: +# threshold_up: 0.9 +# threshold_down: 0.1 + code_sandbox: + use_local: true + worker_cls: roll.pipeline.rlvr.rewards.code_sandbox_reward_worker.CodeSandboxRewardWorker + tag_included: [KodCode] + model_args: + model_name_or_path: ${reward_pretrain} + data_args: + template: qwen2_5 + world_size: 8 + infer_batch_size: 1 +# query_filter_config: +# type: std_filter +# filter_args: +# std_threshold: 0 + llm_judge: + # NOTE: llm as judge 也需要gpu, 不能和actor infer共享gpu + worker_cls: roll.pipeline.rlvr.rewards.llm_judge_reward_worker.LLMJudgeRewardWorker + judge_prompt: Qwen2.5-7B-Instruct-RLVR-prompt + judge_model_type: inference + tag_included: [RLVR] + model_args: + model_name_or_path: AI-ModelScope/Qwen2.5-7B-Instruct-RLVR + flash_attn: fa2 + disable_gradient_checkpointing: true + dtype: bf16 + model_type: trl + generating_args: + max_new_tokens: 100 + top_p: 0.8 + top_k: 50 + num_beams: 1 + temperature: 0.8 + num_return_sequences: 1 + data_args: + template: qwen2_5 + strategy_args: + strategy_name: hf_infer + strategy_config: null + device_mapping: list(range(12,16)) + infer_batch_size: 4 \ No newline at end of file diff --git a/examples/qwen2.5-0.5B-rlvr_megatron_temp_reward_filtering/run_rlvr_pipeline.sh b/examples/qwen2.5-0.5B-rlvr_megatron_temp_reward_filtering/run_rlvr_pipeline.sh new file mode 100755 index 00000000..7c7e9db7 --- /dev/null +++ b/examples/qwen2.5-0.5B-rlvr_megatron_temp_reward_filtering/run_rlvr_pipeline.sh @@ -0,0 +1,5 @@ +#!/bin/bash +set +x + +CONFIG_PATH=$(basename $(dirname $0)) +python examples/start_rlvr_pipeline.py --config_path $CONFIG_PATH --config_name rlvr_config diff --git a/roll/pipeline/rlvr/rlvr_config.py b/roll/pipeline/rlvr/rlvr_config.py index 17cffec8..7e58f4ef 100644 --- a/roll/pipeline/rlvr/rlvr_config.py +++ b/roll/pipeline/rlvr/rlvr_config.py @@ -229,6 +229,11 @@ class RLVRConfig(BaseConfig): difficulty_high_threshold: float = field(default=1.0) error_max_len_clip: bool = field(default=False) error_max_len_threshold: int = field(default=9999999999) + use_filtering_metric: bool = field(default=False, metadata={"help": "Apply reward metric for prompt filtering."}) + filtering_metric: str = field(default=None, metadata={"help": "Prompt filtering metric."}) + num_recent_reward: int = field(default=0, metadta={"help": "Number of recent rewards applied for prompt filtering."}) + filtering_warmup_epoch: int = field(default=0, metadata={"help": "Minimal epoch that starts to apply prompt filtering."}) + filtering_max_epoch: int = field(default=0, metadata={"help": "Maximal epoch that applys prompt filtering."}) def __post_init__(self): super().__post_init__() diff --git a/roll/pipeline/rlvr/rlvr_pipeline.py b/roll/pipeline/rlvr/rlvr_pipeline.py index 6011f014..c372cb44 100644 --- a/roll/pipeline/rlvr/rlvr_pipeline.py +++ b/roll/pipeline/rlvr/rlvr_pipeline.py @@ -8,6 +8,7 @@ import datasets import ray import torch +import numpy as np from codetiming import Timer from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy from ray.util.timer import _Timer @@ -309,6 +310,27 @@ def __init__(self, pipeline_config: RLVRConfig): for domain in self.rewards.keys(): self.running[domain] = RunningMoments() + def dataset_info_init(self): + self.history_metrics_per_domain_dataset = {} + self.prompt_reward_by_step = {} + self.data_vol = {} + for domain in self.domain_datasets: + self.history_metrics_per_domain_dataset[domain] = { + "latest_avg_reward": [], + "num_prompts": [] + } + self.data_vol[domain] = len(self.domain_datasets[domain]) + self.step_per_ep = math.ceil(sum(self.data_vol.values()) / self.pipeline_config.rollout_batch_size) + self.data_vol_init = self.data_vol.copy() + self.data_vol_total_init = sum(self.data_vol_init.values()) + self.data_idx_filter_total = {} + + for domain in self.domain_datasets: + self.data_idx_filter_total[domain] = [] + + self.num_prompts = dict([(domain, 0) for domain in self.domain_datasets]) + self.total_data_vol = sum(self.data_vol.values()) + @torch.no_grad() def run(self): # 计算tokens per second 系统吞吐 @@ -321,12 +343,70 @@ def run(self): actor_infer_response_timer = _Timer(window_size=5) actor_train_timer = _Timer(window_size=5) + self.dataset_info_init() + epoch, ep_step_accum = 0, 0 + for global_step in range(self.pipeline_config.max_steps): if global_step <= self.state.step: global_step += 1 continue logger.info(f"pipeline step {global_step} start...") + if ep_step_accum == self.step_per_ep: + epoch += 1 + ep_step_accum = 0 + + if len(self.prompt_reward_by_step) == self.data_vol_total_init: + self.total_data_vol_updated = 0 + for domain in self.domain_datasets: + self.history_metrics_per_domain_dataset[domain]["num_prompts"].append(self.num_prompts[domain]) + ep_reward = len(self.history_metrics_per_domain_dataset[domain]["latest_avg_reward"]) + domain_avg_reward = np.mean([v["history"][ep_reward]["pass_rate"] for k, v in self.prompt_reward_by_step.items() if + k.startswith(domain) and len(v["history"]) > ep_reward]) + if self.pipeline_config.use_filtering_metric and epoch >= self.pipeline_config.filtering_warmup_epoch and epoch < self.pipeline_config.filtering_max_epoch: + if self.pipeline_config.filtering_metric == "mean_temporal_reward": + domain_idx_to_filter = [idx for idx, reward in self.prompt_reward_by_step.items() if len(reward["history"]) >= self.pipeline_config.filtering_warmup_epoch + and sum([x["pass_rate"] for x in reward["history"][-self.pipeline_config.num_recent_reward:]]) == 1] + self.data_idx_filter_total[domain] += [idx.split("|")[1] for idx in domain_idx_to_filter] + self.history_metrics_per_domain_dataset[domain]["latest_avg_reward"].append(domain_avg_reward) + + if len(self.data_idx_filter_total[domain]): + domain_dataset_filtered = self.domain_datasets[domain].filter(lambda data_i: data_i["id"] not in self.data_idx_filter_total[domain]) + logger.info("{} filtered step {}: {}".format(domain, global_step, len(domain_dataset_filtered))) + domain_filter_generate_scheduler = DynamicSamplingScheduler.options( + scheduling_strategy=NodeAffinitySchedulingStrategy( + node_id=ray.get_runtime_context().get_node_id(), + soft=False, + ) + ).remote(pipeline_config=self.pipeline_config) + ray.get( + domain_filter_generate_scheduler.set_scheduler.remote( + actor_cluster=self.actor_infer, + reward_clusters={domain: self.rewards[domain]}, + dataset=domain_dataset_filtered, + collect_fn_cls=DataCollatorWithPaddingForPaddedKeys, + collect_fn_kwargs=dict(max_length=self.pipeline_config.prompt_length, padding="max_length"), + response_filter_fn=lambda data_item, config: True, + query_filter_fn=query_filter_fn, + response_callback_fn=domain_filter_generate_scheduler.report_response.remote, + state=self.state.kv.get(f"scheduler_state_{domain}", None), + ) + ) + + # update domain batch size + self.generate_schedulers[domain] = domain_filter_generate_scheduler + self.domain_batch_size[domain] = min(self.domain_batch_size[domain], len(domain_dataset_filtered)) + + self.total_data_vol_updated += len(domain_dataset_filtered) + else: + self.total_data_vol_updated += len(self.domain_datasets[domain]) + + # update step consists in one epoch + self.total_data_vol = self.total_data_vol_updated + self.step_per_ep = math.ceil(self.total_data_vol / self.pipeline_config.rollout_batch_size) + + self.num_prompts = dict([(domain, 0) for domain in self.domain_datasets]) + metrics_mgr.clear_metrics() with tps_timer, Timer(name="step_total", logger=None) as step_total_timer: @@ -369,6 +449,28 @@ def run(self): domain, reduce_metrics(domain_batch.meta_info.pop("metrics", {})) ) domain_batches[domain] = domain_batch + + domain_batch_ids = domain_batch.non_tensor_batch["id"] + domain_batch_scores = domain_batch.batch["scores"] + + domain_id2rewards = {} + for domain_id, domain_prompt_score in zip(domain_batch_ids, domain_batch_scores): + domain_id_key = "{}|{}".format(domain, domain_id) + if domain_id_key not in self.prompt_reward_by_step: + self.prompt_reward_by_step[domain_id_key] = {"history": [], "meta_metric": []} + if domain_id_key not in domain_id2rewards: + domain_id2rewards[domain_id_key] = [] + domain_id2rewards[domain_id_key].append(domain_prompt_score.item()) + + for domain_id_key, rewards in domain_id2rewards.items(): + domain = domain_id_key.split("|")[0] + self.prompt_reward_by_step[domain_id_key]["history"].append({ + "pass_rate": np.mean(rewards), + "epoch": len(self.prompt_reward_by_step[domain_id_key]["history"]), + "global_step": global_step + }) + self.num_prompts[domain] += 1 + generate_output = DataProto.concat([domain_batch for domain_batch in domain_batches.values()]) generate_output.meta_info.pop("is_offload_states", None) @@ -524,6 +626,7 @@ def run(self): logger.info(f"pipeline step {global_step} finished") global_step += 1 + ep_step_accum += 1 logger.info("pipeline complete!") @torch.no_grad()