Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,7 @@ outputs
tensorboard_log
ckpt

.hopeignore
.hopeignore

## RL-Factory
tmp/
2 changes: 2 additions & 0 deletions env.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
source /home/ma-user/modelarts/work/jjw/miniconda3-A100/bin/activate
conda activate rl-factory
21 changes: 18 additions & 3 deletions envs/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ def __init__(self, config):
self.tool_manager = TOOL_MANAGER_REGISTRY[tool_manager_name](verl_config=config)
self.max_prompt_length = config.get('max_prompt_length', 2048)
self.use_verify_tool = False
self.use_simulated_user_feedback = config.get('use_simulated_user_feedback', True)


def verify_tool(self, data_source, solution_str, ground_truth, extra_info):
# If you need a tool to evaluate the generated response, you need to modify the following code
Expand Down Expand Up @@ -53,10 +55,20 @@ def _process_data(self, data_item, tokenizer):
def step(self, responses, tokenizer):
cur_actions, tool_results = self.tool_manager.execute_actions(responses=responses)
next_obs, dones, valid_action, is_tool = [], [], [], []
user_feedback_flag = []

for action, tool_result in zip(cur_actions, tool_results):
if action == 'answer':
temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0
if self.use_simulated_user_feedback:
if random.random() < self.tool_manager.user_feedback_prob:
user_feedback_flag.append(1)
temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', False, 1, 0
else:
user_feedback_flag.append(0)
temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0
temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0
else:
temp_next_obs, temp_done, temp_valid_action, temp_is_tool = '', True, 1, 0
elif action == 'error':
temp_next_obs = self.tool_manager.get_prompt(
input_data=tool_result,
Expand All @@ -75,14 +87,17 @@ def step(self, responses, tokenizer):
temp_done, temp_valid_action, temp_is_tool = False, 1, 1
else:
raise ValueError('Unexpected action: {}'.format(action))

next_obs.append(temp_next_obs)
dones.append(temp_done)
valid_action.append(temp_valid_action)
is_tool.append(temp_is_tool)


if self.use_simulated_user_feedback:
next_obs, dones, valid_action, is_tool = self.tool_manager.simulated_user_feedback(responses, tokenizer, user_feedback_flag, next_obs, dones, valid_action, is_tool)
return next_obs, dones, valid_action, is_tool



def compute_score(self, reward_rollout_wg, reward_tokenizer, tokenizer, data: DataProto, if_val=False):
if reward_rollout_wg is not None:
scores = self._compute_score_with_reward_rollout_wg(reward_rollout_wg, reward_tokenizer, data)
Expand Down
Empty file.
3 changes: 3 additions & 0 deletions envs/simulated_user/simulated_user.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
train:
data_file: user.jsonl
user_feedback_prob: 0.1
5 changes: 5 additions & 0 deletions envs/simulated_user/user.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"user_id": "u001", "gender": "male", "age": 28, "interests": ["technology", "sports"], "location": "Beijing", "persona": "A 28-year-old man from Beijing who loves technology and sports. He is outgoing, enjoys exploring new things, works at an internet company, and often goes to the gym and attends tech salons in his spare time."}
{"user_id": "u002", "gender": "female", "age": 24, "interests": ["fashion", "travel"], "location": "Shanghai", "persona": "A 24-year-old woman from Shanghai who is a fashion enthusiast. She loves traveling and taking photos, is outgoing, enjoys making friends, and likes sharing her life on social media."}
{"user_id": "u003", "gender": "male", "age": 35, "interests": ["finance", "reading"], "location": "Shenzhen", "persona": "A 35-year-old man from Shenzhen working in the finance industry. He enjoys reading finance books, is steady and focused on self-improvement, and often participates in book clubs in his free time."}
{"user_id": "u004", "gender": "female", "age": 30, "interests": ["cooking", "music"], "location": "Guangzhou", "persona": "A 30-year-old woman from Guangzhou who loves cooking and music. She is gentle, enjoys making delicious food at home, and often goes to concerts to relax."}
{"user_id": "u005", "gender": "male", "age": 22, "interests": ["gaming", "anime"], "location": "Chengdu", "persona": "A 22-year-old male college student from Chengdu who likes gaming and watching anime. He has a good sense of humor and enjoys discussing anime topics with friends."}
110 changes: 101 additions & 9 deletions envs/tool_manager/qwen3_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
from envs.utils.mcp_manager import MCPManager as SSEMCPManager
from qwen_agent.tools import TOOL_REGISTRY, MCPManager, BaseTool
from qwen_agent.llm.schema import ASSISTANT, SYSTEM, USER, FUNCTION, ContentItem
import yaml
import random
import os



def parse_mcp_tools_config(file_path):
Expand Down Expand Up @@ -40,6 +44,21 @@ def __init__(self, verl_config):
'lang': 'en',
'max_input_tokens': 10000
}
self.use_simulated_user_feedback = verl_config.get("use_simulated_user_feedback", True)
if self.use_simulated_user_feedback:
import datasets
from omegaconf import OmegaConf
parent_dir = os.path.dirname(os.path.dirname(__file__))
user_yaml_path = os.path.join(parent_dir, 'simulated_user', 'simulated_user.yaml')
self.yaml_config = OmegaConf.load(user_yaml_path)
self.user_feedback_prob = self.yaml_config.train.user_feedback_prob
data_file = self.yaml_config.train.data_file
if not os.path.isabs(data_file):
data_file = os.path.join(parent_dir, 'simulated_user', data_file)
if data_file.endswith('.jsonl'):
self.persona_dataset = datasets.load_dataset('json', data_files=data_file)['train']
else:
raise ValueError(f"Unsupported file type: {data_file}")

def get_tool(self, name_or_short_name: str):
"""通过名称或简写获取工具
Expand Down Expand Up @@ -328,7 +347,7 @@ def parse_tools(self, response: str):
return parsed_tools

def get_prompt(self, input_data, tokenizer, mode='initial', add_generation_prompt=True):
assert mode in ['initial', 'tool_call', 'assistant_response'], 'Invalid mode: {}'.format(mode)
assert mode in ['initial', 'tool_call', 'assistant_response', 'user_feedback'], 'Invalid mode: {}'.format(mode)
base_chat = [
{'role': SYSTEM, 'content': 'base'},
{'role': USER, 'content': 'base'},
Expand All @@ -342,25 +361,98 @@ def get_prompt(self, input_data, tokenizer, mode='initial', add_generation_promp
if mode == 'initial':
chat = input_data
prompt_with_chat_template = tokenizer.apply_chat_template(
conversation=chat, tokenize=False, tools=[func.function for func in self.tool_map.values()],
conversation=chat, tokenize=False, tools=[func.function for func in self.tool_map.values()],
add_generation_prompt=add_generation_prompt, enable_thinking=self.verl_config.enable_thinking
)
elif mode in ['tool_call', 'assistant_response']:
# NOTE: the assistant response might not be used
role = 'tool' if mode == 'tool_call' else ASSISTANT
elif mode in ['tool_call', 'assistant_response', 'user_feedback']:
if mode == 'tool_call':
role = 'tool'
elif mode == 'user_feedback':
role = USER
else:
role = ASSISTANT
if type(input_data) == str:
chat = {'role': role, 'content': input_data}
chat = [{'role': role, 'content': input_data}]
elif type(input_data) == list:
chat = input_data
else:
raise ValueError('Unexpected type of input_data {} ({})'.format(type(input_data), input_data))

temp_prompt_with_chat_template = tokenizer.apply_chat_template(
conversation=base_chat + chat, tools=[func.function for func in self.tool_map.values()],
conversation=base_chat + chat, tools=[func.function for func in self.tool_map.values()],
tokenize=False, add_generation_prompt=add_generation_prompt, enable_thinking=self.verl_config.enable_thinking
)
prompt_with_chat_template = temp_prompt_with_chat_template.replace(base_prompt, '')
else:
raise ValueError('Invalid mode: {}'.format(mode))

return prompt_with_chat_template

async def _single_feedback(self, response: str) -> dict:
cfg = self.yaml_config
feedbacks = cfg.get('feedbacks', [])
persona = None
persona_str = None
try:
persona = random.choice(self.persona_dataset)
if isinstance(persona, dict):
persona_str = persona.get('persona') or str(persona)
else:
persona_str = str(persona)
except Exception:
persona_str = None
from openai import OpenAI
api_key = os.getenv("OPENAI_API_KEY","sk-045a991549b245c9838dfe33552fcf86")
assert api_key is not None, "OPENAI_API_KEY should be set"
base_url = os.getenv("OPENAI_BASE_URL","https://api.deepseek.com")
client = OpenAI(api_key=api_key, base_url=base_url)
if persona_str:
try:
prompt = (
f"Your persona is as follows:\n{persona_str}\n\n"
f"Please, in the role of this persona, evaluate and provide feedback on the following AI answer. The feedback should be concise, specific, and constructive, your feedback should be in English:\n"
f"AI's answer: {response}\n"
f"Your feedback:"
)
completion = await asyncio.to_thread(
client.chat.completions.create,
model="deepseek-chat",
messages=[
{"role": "system", "content": "You are a user feedback generator."},
{"role": "user", "content": prompt}
],
max_tokens=128,
temperature=0.7,
)
feedback = completion.choices[0].message.content.strip()
except Exception as e:
feedback = random.choice(feedbacks) if feedbacks else "Please improve your answer."
else:
feedback = random.choice(feedbacks) if feedbacks else "Please improve your answer."
return feedback

def simulated_user_feedback(self, responses, tokenizer, user_feedback_flag, next_obs, dones, valid_action, is_tool):
tmp_responses = []
simulaited_idx = []
for idx, use_feedback in enumerate(user_feedback_flag):
if use_feedback == 1:
tmp_responses.append(responses[idx])
simulaited_idx.append(idx)
if len(tmp_responses) > 0:
import asyncio
tasks = [self._single_feedback(response) for response in tmp_responses]
try:
loop = asyncio.get_running_loop()
results = loop.run_until_complete(asyncio.gather(*tasks))
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
results = loop.run_until_complete(asyncio.gather(*tasks))
loop.close()
for idx, result in zip(simulaited_idx, results):
breakpoint()
assert isinstance(next_obs[idx], str)
assert isinstance(result, str)
next_obs[idx]+= self.get_prompt(result, tokenizer, mode='user_feedback')
dones[idx] = False
valid_action[idx] = 1
is_tool[idx] = 1
return next_obs, dones, valid_action, is_tool
2 changes: 1 addition & 1 deletion envs/utils/tool_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def postprocess_output(self, output: DataProto, step: int):
responses=responses_str, tokenizer=self.tokenizer
)

# encode infos for next prompt
# encode infos for next prompt TODO: can tokenize be faster?
info_tokens = self.tokenizer(infos_str).input_ids
next_prompt_token = []
next_prompt_length = []
Expand Down
2 changes: 1 addition & 1 deletion install.sh
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pip3 install accelerate bitsandbytes datasets deepspeed==0.16.4 einops flash-attn==2.7.0.post2 isort jsonlines loralib optimum packaging peft pynvml>=12.0.0 ray[default]==2.42.0 tensorboard torch torchmetrics tqdm transformers==4.48.3 transformers_stream_generator wandb wheel
pip3 install accelerate bitsandbytes datasets deepspeed einops isort jsonlines loralib optimum packaging peft pynvml>=12.0.0 ray[default]==2.42.0 tensorboard torchmetrics tqdm transformers==4.48.3 transformers_stream_generator wandb wheel
pip3 install vllm==0.8.5
pip3 install "qwen-agent[code_interpreter]"
pip3 install llama_index bs4 pymilvus infinity_client codetiming tensordict==0.6 omegaconf torchdata==0.10.0 hydra-core easydict dill python-multipart
Expand Down
35 changes: 22 additions & 13 deletions main_grpo.sh
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
set -e -x

export MODEL_PATH=/your/path/to/Qwen/Qwen3-8B
export REWARD_MODEL_PATH=/your/path/to/Qwen/QwQ-32B
#!/bin/bash
source /root/autodl-tmp/RL-Factory/tmp/env_autodl.sh
ray stop --force
sleep 5
export MODEL_PATH=/root/autodl-tmp/models/Qwen/Qwen3-0.6B
export REWARD_MODEL_PATH=/root/autodl-tmp/models/Qwen/Qwen3-0.6B
export WANDB_API_KEY=76ecf2334073036f76da7b9e4eb5bbe934767728
export HYDRA_FULL_ERROR=1
export RAY_DEBUG=1
# export RAY_DEBUG="legacy"
# export VLLM_ATTENTION_BACKEND=XFORMERS
DATA=/root/autodl-tmp/data/nq_hotpotqa_train



python3 -m verl.trainer.main_ppo\
algorithm.adv_estimator=grpo\
data.train_files=data/nq_search/train.parquet\
data.val_files=data/nq_search/test.parquet\
data.train_batch_size=128\
data.max_prompt_length=4096\
data.train_files=$DATA/train.parquet\
data.val_files=$DATA/test.parquet\
data.train_batch_size=32\
data.max_prompt_length=1024\
data.max_response_length=512\
actor_rollout_ref.model.path=$MODEL_PATH\
actor_rollout_ref.model.use_remove_padding=True\
Expand All @@ -26,7 +35,7 @@ python3 -m verl.trainer.main_ppo\
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=16\
actor_rollout_ref.rollout.tensor_model_parallel_size=1\
actor_rollout_ref.rollout.name=vllm\
actor_rollout_ref.rollout.gpu_memory_utilization=0.75\
actor_rollout_ref.rollout.gpu_memory_utilization=0.4\
actor_rollout_ref.rollout.n=4\
actor_rollout_ref.rollout.max_turns=2\
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=16\
Expand All @@ -39,18 +48,18 @@ python3 -m verl.trainer.main_ppo\
actor_rollout_ref.env.enable_thinking=False\
actor_rollout_ref.env.config_path=/your/path/to/envs/configs/mcp_tools.pydata\
reward_rollout.if_use_reward_rollout=False\
reward_rollout.rollout.tensor_model_parallel_size=4\
reward_rollout.rollout.gpu_memory_utilization=0.75\
reward_rollout.rollout.tensor_model_parallel_size=1\
reward_rollout.rollout.gpu_memory_utilization=0.4\
reward_rollout.rollout.model_name=$REWARD_MODEL_PATH\
reward_rollout.rollout.free_cache_engine=False\
reward_rollout.rollout.response_length=2048\
reward_rollout.rollout.response_length=512\
reward_model.reward_manager=parallel\
algorithm.kl_ctrl.kl_coef=0.001\
trainer.critic_warmup=0\
trainer.logger=['tensorboard']\
trainer.project_name='GRPO_search'\
trainer.experiment_name='search_with_thinking'\
trainer.n_gpus_per_node=8\
trainer.n_gpus_per_node=1\
trainer.nnodes=1\
trainer.val_before_train=False\
trainer.default_local_dir=ckpt\
Expand Down
2 changes: 1 addition & 1 deletion verl/utils/dataset/rl_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def __getitem__(self, item):

# There's a trap here, multi_modal_inputs has to be a dict, not BatchFeature
row_dict["multi_modal_data"] = multi_modal_data
row_dict["multi_modal_inputs"] = dict(model_inputs)
row_dict["multiv_modal_inputs"] = dict(model_inputs)

# second_per_grid_ts isn't used for training, just for mrope
row_dict["multi_modal_inputs"].pop("second_per_grid_ts", None)
Expand Down