diff --git a/.gitignore b/.gitignore index e88e92f..1412450 100644 --- a/.gitignore +++ b/.gitignore @@ -129,4 +129,7 @@ outputs tensorboard_log ckpt -.hopeignore \ No newline at end of file +.hopeignore + +## RL-Factory +tmp/ \ No newline at end of file diff --git a/env.sh b/env.sh new file mode 100644 index 0000000..8e0a22b --- /dev/null +++ b/env.sh @@ -0,0 +1,2 @@ +source /home/ma-user/modelarts/work/jjw/miniconda3-A100/bin/activate +conda activate rl-factory \ No newline at end of file diff --git a/envs/base.py b/envs/base.py index f5fe9a1..cb2491e 100755 --- a/envs/base.py +++ b/envs/base.py @@ -15,6 +15,8 @@ def __init__(self, config): self.tool_manager = TOOL_MANAGER_REGISTRY[tool_manager_name](verl_config=config) self.max_prompt_length = config.get('max_prompt_length', 2048) self.use_verify_tool = False + self.use_simulated_user_feedback = config.get('use_simulated_user_feedback', True) + def verify_tool(self, data_source, solution_str, ground_truth, extra_info): # If you need a tool to evaluate the generated response, you need to modify the following code @@ -53,10 +55,20 @@ def _process_data(self, data_item, tokenizer): def step(self, responses, tokenizer): cur_actions, tool_results = self.tool_manager.execute_actions(responses=responses) next_obs, dones, valid_action, is_tool = [], [], [], [] + user_feedback_flag = [] for action, tool_result in zip(cur_actions, tool_results): if action == 'answer': - temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0 + if self.use_simulated_user_feedback: + if random.random() < self.tool_manager.user_feedback_prob: + user_feedback_flag.append(1) + temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', False, 1, 0 + else: + user_feedback_flag.append(0) + temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0 + temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0 + else: + temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0 elif action == 'error': temp_next_obs = self.tool_manager.get_prompt( input_data=tool_result, @@ -75,14 +87,17 @@ def step(self, responses, tokenizer): temp_done, temp_valid_action, temp_is_tool = False, 1, 1 else: raise ValueError('Unexpected action: {}'.format(action)) - next_obs.append(temp_next_obs) dones.append(temp_done) valid_action.append(temp_valid_action) is_tool.append(temp_is_tool) - + + if self.use_simulated_user_feedback: + next_obs, dones, valid_action, is_tool = self.tool_manager.simulated_user_feedback(responses, tokenizer, user_feedback_flag, next_obs, dones, valid_action, is_tool) return next_obs, dones, valid_action, is_tool + + def compute_score(self, reward_rollout_wg, reward_tokenizer, tokenizer, data: DataProto, if_val=False): if reward_rollout_wg is not None: scores = self._compute_score_with_reward_rollout_wg(reward_rollout_wg, reward_tokenizer, data) diff --git a/envs/simulated_user/generate_dataset.py b/envs/simulated_user/generate_dataset.py new file mode 100644 index 0000000..e69de29 diff --git a/envs/simulated_user/simulated_user.yaml b/envs/simulated_user/simulated_user.yaml new file mode 100644 index 0000000..062bfed --- /dev/null +++ b/envs/simulated_user/simulated_user.yaml @@ -0,0 +1,3 @@ +train: + data_file: user.jsonl + user_feedback_prob: 0.1 diff --git a/envs/simulated_user/user.jsonl b/envs/simulated_user/user.jsonl new file mode 100644 index 0000000..98dcd3c --- /dev/null +++ b/envs/simulated_user/user.jsonl @@ -0,0 +1,5 @@ +{"user_id": "u001", "gender": "male", "age": 28, "interests": ["technology", "sports"], "location": "Beijing", "persona": "A 28-year-old man from Beijing who loves technology and sports. He is outgoing, enjoys exploring new things, works at an internet company, and often goes to the gym and attends tech salons in his spare time."} +{"user_id": "u002", "gender": "female", "age": 24, "interests": ["fashion", "travel"], "location": "Shanghai", "persona": "A 24-year-old woman from Shanghai who is a fashion enthusiast. She loves traveling and taking photos, is outgoing, enjoys making friends, and likes sharing her life on social media."} +{"user_id": "u003", "gender": "male", "age": 35, "interests": ["finance", "reading"], "location": "Shenzhen", "persona": "A 35-year-old man from Shenzhen working in the finance industry. He enjoys reading finance books, is steady and focused on self-improvement, and often participates in book clubs in his free time."} +{"user_id": "u004", "gender": "female", "age": 30, "interests": ["cooking", "music"], "location": "Guangzhou", "persona": "A 30-year-old woman from Guangzhou who loves cooking and music. She is gentle, enjoys making delicious food at home, and often goes to concerts to relax."} +{"user_id": "u005", "gender": "male", "age": 22, "interests": ["gaming", "anime"], "location": "Chengdu", "persona": "A 22-year-old male college student from Chengdu who likes gaming and watching anime. He has a good sense of humor and enjoys discussing anime topics with friends."} diff --git a/envs/tool_manager/qwen3_manager.py b/envs/tool_manager/qwen3_manager.py index 7290206..81f0378 100644 --- a/envs/tool_manager/qwen3_manager.py +++ b/envs/tool_manager/qwen3_manager.py @@ -13,6 +13,10 @@ from envs.utils.mcp_manager import MCPManager as SSEMCPManager from qwen_agent.tools import TOOL_REGISTRY, MCPManager, BaseTool from qwen_agent.llm.schema import ASSISTANT, SYSTEM, USER, FUNCTION, ContentItem +import yaml +import random +import os + def parse_mcp_tools_config(file_path): @@ -40,6 +44,21 @@ def __init__(self, verl_config): 'lang': 'en', 'max_input_tokens': 10000 } + self.use_simulated_user_feedback = verl_config.get("use_simulated_user_feedback", True) + if self.use_simulated_user_feedback: + import datasets + from omegaconf import OmegaConf + parent_dir = os.path.dirname(os.path.dirname(__file__)) + user_yaml_path = os.path.join(parent_dir, 'simulated_user', 'simulated_user.yaml') + self.yaml_config = OmegaConf.load(user_yaml_path) + self.user_feedback_prob = self.yaml_config.train.user_feedback_prob + data_file = self.yaml_config.train.data_file + if not os.path.isabs(data_file): + data_file = os.path.join(parent_dir, 'simulated_user', data_file) + if data_file.endswith('.jsonl'): + self.persona_dataset = datasets.load_dataset('json', data_files=data_file)['train'] + else: + raise ValueError(f"Unsupported file type: {data_file}") def get_tool(self, name_or_short_name: str): """通过名称或简写获取工具 @@ -328,7 +347,7 @@ def parse_tools(self, response: str): return parsed_tools def get_prompt(self, input_data, tokenizer, mode='initial', add_generation_prompt=True): - assert mode in ['initial', 'tool_call', 'assistant_response'], 'Invalid mode: {}'.format(mode) + assert mode in ['initial', 'tool_call', 'assistant_response', 'user_feedback'], 'Invalid mode: {}'.format(mode) base_chat = [ {'role': SYSTEM, 'content': 'base'}, {'role': USER, 'content': 'base'}, @@ -342,25 +361,98 @@ def get_prompt(self, input_data, tokenizer, mode='initial', add_generation_promp if mode == 'initial': chat = input_data prompt_with_chat_template = tokenizer.apply_chat_template( - conversation=chat, tokenize=False, tools=[func.function for func in self.tool_map.values()], + conversation=chat, tokenize=False, tools=[func.function for func in self.tool_map.values()], add_generation_prompt=add_generation_prompt, enable_thinking=self.verl_config.enable_thinking ) - elif mode in ['tool_call', 'assistant_response']: - # NOTE: the assistant response might not be used - role = 'tool' if mode == 'tool_call' else ASSISTANT + elif mode in ['tool_call', 'assistant_response', 'user_feedback']: + if mode == 'tool_call': + role = 'tool' + elif mode == 'user_feedback': + role = USER + else: + role = ASSISTANT if type(input_data) == str: - chat = {'role': role, 'content': input_data} + chat = [{'role': role, 'content': input_data}] elif type(input_data) == list: chat = input_data else: raise ValueError('Unexpected type of input_data {} ({})'.format(type(input_data), input_data)) - temp_prompt_with_chat_template = tokenizer.apply_chat_template( - conversation=base_chat + chat, tools=[func.function for func in self.tool_map.values()], + conversation=base_chat + chat, tools=[func.function for func in self.tool_map.values()], tokenize=False, add_generation_prompt=add_generation_prompt, enable_thinking=self.verl_config.enable_thinking ) prompt_with_chat_template = temp_prompt_with_chat_template.replace(base_prompt, '') else: raise ValueError('Invalid mode: {}'.format(mode)) - return prompt_with_chat_template + + async def _single_feedback(self, response: str) -> dict: + cfg = self.yaml_config + feedbacks = cfg.get('feedbacks', []) + persona = None + persona_str = None + try: + persona = random.choice(self.persona_dataset) + if isinstance(persona, dict): + persona_str = persona.get('persona') or str(persona) + else: + persona_str = str(persona) + except Exception: + persona_str = None + from openai import OpenAI + api_key = os.getenv("OPENAI_API_KEY","sk-045a991549b245c9838dfe33552fcf86") + assert api_key is not None, "OPENAI_API_KEY should be set" + base_url = os.getenv("OPENAI_BASE_URL","https://api.deepseek.com") + client = OpenAI(api_key=api_key, base_url=base_url) + if persona_str: + try: + prompt = ( + f"Your persona is as follows:\n{persona_str}\n\n" + f"Please, in the role of this persona, evaluate and provide feedback on the following AI answer. The feedback should be concise, specific, and constructive, your feedback should be in English:\n" + f"AI's answer: {response}\n" + f"Your feedback:" + ) + completion = await asyncio.to_thread( + client.chat.completions.create, + model="deepseek-chat", + messages=[ + {"role": "system", "content": "You are a user feedback generator."}, + {"role": "user", "content": prompt} + ], + max_tokens=128, + temperature=0.7, + ) + feedback = completion.choices[0].message.content.strip() + except Exception as e: + feedback = random.choice(feedbacks) if feedbacks else "Please improve your answer." + else: + feedback = random.choice(feedbacks) if feedbacks else "Please improve your answer." + return feedback + + def simulated_user_feedback(self, responses, tokenizer, user_feedback_flag, next_obs, dones, valid_action, is_tool): + tmp_responses = [] + simulaited_idx = [] + for idx, use_feedback in enumerate(user_feedback_flag): + if use_feedback == 1: + tmp_responses.append(responses[idx]) + simulaited_idx.append(idx) + if len(tmp_responses) > 0: + import asyncio + tasks = [self._single_feedback(response) for response in tmp_responses] + try: + loop = asyncio.get_running_loop() + results = loop.run_until_complete(asyncio.gather(*tasks)) + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + results = loop.run_until_complete(asyncio.gather(*tasks)) + loop.close() + for idx, result in zip(simulaited_idx, results): + breakpoint() + assert isinstance(next_obs[idx], str) + assert isinstance(result, str) + next_obs[idx]+= self.get_prompt(result, tokenizer, mode='user_feedback') + dones[idx] = False + valid_action[idx] = 1 + is_tool[idx] = 1 + return next_obs, dones, valid_action, is_tool diff --git a/envs/utils/tool_utils.py b/envs/utils/tool_utils.py index 11ba6fa..40a60db 100644 --- a/envs/utils/tool_utils.py +++ b/envs/utils/tool_utils.py @@ -61,7 +61,7 @@ def postprocess_output(self, output: DataProto, step: int): responses=responses_str, tokenizer=self.tokenizer ) - # encode infos for next prompt + # encode infos for next prompt TODO: can tokenize be faster? info_tokens = self.tokenizer(infos_str).input_ids next_prompt_token = [] next_prompt_length = [] diff --git a/install.sh b/install.sh index 8971329..326d21d 100755 --- a/install.sh +++ b/install.sh @@ -1,4 +1,4 @@ -pip3 install accelerate bitsandbytes datasets deepspeed==0.16.4 einops flash-attn==2.7.0.post2 isort jsonlines loralib optimum packaging peft pynvml>=12.0.0 ray[default]==2.42.0 tensorboard torch torchmetrics tqdm transformers==4.48.3 transformers_stream_generator wandb wheel +pip3 install accelerate bitsandbytes datasets deepspeed einops isort jsonlines loralib optimum packaging peft pynvml>=12.0.0 ray[default]==2.42.0 tensorboard torchmetrics tqdm transformers==4.48.3 transformers_stream_generator wandb wheel pip3 install vllm==0.8.5 pip3 install "qwen-agent[code_interpreter]" pip3 install llama_index bs4 pymilvus infinity_client codetiming tensordict==0.6 omegaconf torchdata==0.10.0 hydra-core easydict dill python-multipart diff --git a/main_grpo.sh b/main_grpo.sh index 12c318b..77a62fd 100644 --- a/main_grpo.sh +++ b/main_grpo.sh @@ -1,15 +1,24 @@ -set -e -x - -export MODEL_PATH=/your/path/to/Qwen/Qwen3-8B -export REWARD_MODEL_PATH=/your/path/to/Qwen/QwQ-32B +#!/bin/bash +source /root/autodl-tmp/RL-Factory/tmp/env_autodl.sh +ray stop --force +sleep 5 +export MODEL_PATH=/root/autodl-tmp/models/Qwen/Qwen3-0.6B +export REWARD_MODEL_PATH=/root/autodl-tmp/models/Qwen/Qwen3-0.6B +export WANDB_API_KEY=76ecf2334073036f76da7b9e4eb5bbe934767728 +export HYDRA_FULL_ERROR=1 +export RAY_DEBUG=1 +# export RAY_DEBUG="legacy" # export VLLM_ATTENTION_BACKEND=XFORMERS +DATA=/root/autodl-tmp/data/nq_hotpotqa_train + + python3 -m verl.trainer.main_ppo\ algorithm.adv_estimator=grpo\ - data.train_files=data/nq_search/train.parquet\ - data.val_files=data/nq_search/test.parquet\ - data.train_batch_size=128\ - data.max_prompt_length=4096\ + data.train_files=$DATA/train.parquet\ + data.val_files=$DATA/test.parquet\ + data.train_batch_size=32\ + data.max_prompt_length=1024\ data.max_response_length=512\ actor_rollout_ref.model.path=$MODEL_PATH\ actor_rollout_ref.model.use_remove_padding=True\ @@ -26,7 +35,7 @@ python3 -m verl.trainer.main_ppo\ actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=16\ actor_rollout_ref.rollout.tensor_model_parallel_size=1\ actor_rollout_ref.rollout.name=vllm\ - actor_rollout_ref.rollout.gpu_memory_utilization=0.75\ + actor_rollout_ref.rollout.gpu_memory_utilization=0.4\ actor_rollout_ref.rollout.n=4\ actor_rollout_ref.rollout.max_turns=2\ actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=16\ @@ -39,18 +48,18 @@ python3 -m verl.trainer.main_ppo\ actor_rollout_ref.env.enable_thinking=False\ actor_rollout_ref.env.config_path=/your/path/to/envs/configs/mcp_tools.pydata\ reward_rollout.if_use_reward_rollout=False\ - reward_rollout.rollout.tensor_model_parallel_size=4\ - reward_rollout.rollout.gpu_memory_utilization=0.75\ + reward_rollout.rollout.tensor_model_parallel_size=1\ + reward_rollout.rollout.gpu_memory_utilization=0.4\ reward_rollout.rollout.model_name=$REWARD_MODEL_PATH\ reward_rollout.rollout.free_cache_engine=False\ - reward_rollout.rollout.response_length=2048\ + reward_rollout.rollout.response_length=512\ reward_model.reward_manager=parallel\ algorithm.kl_ctrl.kl_coef=0.001\ trainer.critic_warmup=0\ trainer.logger=['tensorboard']\ trainer.project_name='GRPO_search'\ trainer.experiment_name='search_with_thinking'\ - trainer.n_gpus_per_node=8\ + trainer.n_gpus_per_node=1\ trainer.nnodes=1\ trainer.val_before_train=False\ trainer.default_local_dir=ckpt\ diff --git a/verl/utils/dataset/rl_dataset.py b/verl/utils/dataset/rl_dataset.py index 53b3abf..0eced06 100755 --- a/verl/utils/dataset/rl_dataset.py +++ b/verl/utils/dataset/rl_dataset.py @@ -190,7 +190,7 @@ def __getitem__(self, item): # There's a trap here, multi_modal_inputs has to be a dict, not BatchFeature row_dict["multi_modal_data"] = multi_modal_data - row_dict["multi_modal_inputs"] = dict(model_inputs) + row_dict["multiv_modal_inputs"] = dict(model_inputs) # second_per_grid_ts isn't used for training, just for mrope row_dict["multi_modal_inputs"].pop("second_per_grid_ts", None)