Skip to content
43 changes: 14 additions & 29 deletions docs/docs/tutorials/rl_multihop/index.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,8 @@
"For this tutorial, you will also need DSPy's Arbor RL server.\n",
"\n",
"```bash\n",
"> pip install arbor-ai\n",
"> python -m arbor.cli serve --arbor-config arbor.yaml\n",
"```\n",
"\n",
"where you create `arbor.yaml` in your directory, containing a plan like:\n",
"\n",
"```text\n",
"inference:\n",
" gpu_ids: '0'\n",
"\n",
"training:\n",
" gpu_ids: '1, 2'\n",
"```\n",
"\n",
"which assigns GPU 0 for inference and GPUs 1 and 2 for training."
"> pip install -U arbor-ai\n",
"```"
]
},
{
Expand All @@ -37,14 +24,16 @@
"import dspy\n",
"from dspy.clients.lm_local_arbor import ArborProvider\n",
"\n",
"import arbor\n",
"arbor_server_info = arbor.init() # Initialize the Arbor server in the background\n",
"\n",
"port = 7453\n",
"local_lm_name = \"Qwen/Qwen2.5-7B-Instruct\"\n",
"local_lm = dspy.LM(\n",
" model=f\"openai/arbor:{local_lm_name}\",\n",
" provider=ArborProvider(),\n",
" temperature=0.7,\n",
" api_base=f\"http://localhost:{port}/v1/\",\n",
" api_key=\"arbor\",\n",
" api_base=arbor_server_info[\"api_base\"],\n",
")\n",
"\n",
"dspy.configure(lm=local_lm)\n",
Expand Down Expand Up @@ -238,17 +227,18 @@
"outputs": [],
"source": [
"from dspy.teleprompt.grpo import GRPO\n",
"from dspy.clients.utils_finetune import MultiGPUConfig\n",
"\n",
"program = ResearchHop(num_docs=4, num_hops=2)\n",
"program.set_lm(local_lm)\n",
"\n",
"# NOTE: Training on 6 GPUs.\n",
"train_kwargs = {\n",
" \"per_device_train_batch_size\": 2,\n",
" \"gradient_accumulation_steps\": 4,\n",
" \"temperature\": 0.7,\n",
" \"gradient_accumulation_steps\": 8,\n",
" \"temperature\": 1.0,\n",
" \"beta\": 0.04,\n",
" \"learning_rate\": 2e-5,\n",
" \"learning_rate\": 1e-5,\n",
" \"gradient_checkpointing\": True,\n",
" \"gradient_checkpointing_kwargs\": {\"use_reentrant\": False},\n",
" \"bf16\": True,\n",
Expand All @@ -262,16 +252,16 @@
"\n",
"compiler = GRPO(\n",
" metric=recall,\n",
" multitask=True,\n",
" num_dspy_examples_per_grpo_step=6,\n",
" num_samples_per_input=8,\n",
" num_rollouts_per_grpo_step=4,\n",
" exclude_demos=True,\n",
" num_train_steps=500,\n",
" num_threads=24,\n",
" num_train_steps=100,\n",
" num_threads=16,\n",
" use_train_as_val=False,\n",
" num_steps_for_val=10,\n",
" train_kwargs=train_kwargs,\n",
" report_train_scores=False,\n",
" gpu_config=MultiGPUConfig(num_inference_gpus=1, num_training_gpus=1),\n",
")\n",
"\n",
"optimized_program = compiler.compile(\n",
Expand Down Expand Up @@ -304,11 +294,6 @@
"source": [
"In our preliminary experiments, training above for about 18 hours boosts the recall (devset) from 61.8% to 66.2%. This is _typically_ worse on cost/quality basis than you'd get from running prompt optimizers dspy.MIPROv2 or dspy.SIMBA, but it's still a very solid start for online RL over arbitrary LM programs for small LMs."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": []
}
],
"metadata": {
Expand Down
29 changes: 10 additions & 19 deletions docs/docs/tutorials/rl_papillon/index.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,8 @@
"For this tutorial, you will also need the Arbor RL server.\n",
"\n",
"```bash\n",
"> pip install arbor-ai\n",
"> python -m arbor.cli serve --arbor-config arbor.yaml\n",
"```\n",
"\n",
"where you create `arbor.yaml` in your directory, containing a plan like:\n",
"\n",
"```text\n",
"inference:\n",
" gpu_ids: '0'\n",
"\n",
"training:\n",
" gpu_ids: '1, 2'\n",
"```\n",
"\n",
"which assigns GPU 0 for inference and GPUs 1 and 2 for training."
"> pip install -U arbor-ai\n",
"```"
]
},
{
Expand All @@ -41,14 +28,16 @@
"import dspy\n",
"from dspy.clients.lm_local_arbor import ArborProvider\n",
"\n",
"import arbor\n",
"arbor_server_info = arbor.init() # Initialize the Arbor server in the background\n",
"\n",
"port = 7453\n",
"local_lm_name = \"Qwen/Qwen3-1.7B\"\n",
"local_lm_name = \"Qwen/Qwen2.5-7B-Instruct\"\n",
"local_lm = dspy.LM(\n",
" model=f\"openai/arbor:{local_lm_name}\",\n",
" provider=ArborProvider(),\n",
" temperature=0.7,\n",
" api_base=f\"http://localhost:{port}/v1/\",\n",
" api_key=\"arbor\",\n",
" api_base=arbor_server_info[\"api_base\"],\n",
")\n",
"\n",
"dspy.configure(lm=local_lm)\n",
Expand Down Expand Up @@ -267,6 +256,7 @@
"outputs": [],
"source": [
"from dspy.teleprompt.grpo import GRPO\n",
"from dspy.clients.utils_finetune import MultiGPUConfig\n",
"\n",
"papillon = PAPILLON(untrusted_model=openai_lm)\n",
"papillon.set_lm(local_lm)\n",
Expand All @@ -275,7 +265,7 @@
"train_kwargs = {\n",
" \"per_device_train_batch_size\": 8,\n",
" \"gradient_accumulation_steps\": 4,\n",
" \"temperature\": 0.7,\n",
" \"temperature\": 1.0,\n",
" \"beta\": 0.04,\n",
" \"learning_rate\": 2e-6,\n",
" \"gradient_checkpointing\": True,\n",
Expand All @@ -301,6 +291,7 @@
" num_steps_for_val=10,\n",
" train_kwargs=train_kwargs,\n",
" report_train_scores=False,\n",
" gpu_config=MultiGPUConfig(num_inference_gpus=2, num_training_gpus=2),\n",
")\n",
"\n",
"optimized_papillon = compiler.compile(\n",
Expand Down
6 changes: 3 additions & 3 deletions dspy/clients/lm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dspy.clients.cache import request_cache
from dspy.clients.openai import OpenAIProvider
from dspy.clients.provider import Provider, ReinforceJob, TrainingJob
from dspy.clients.utils_finetune import TrainDataFormat
from dspy.clients.utils_finetune import MultiGPUConfig, TrainDataFormat
from dspy.dsp.utils.settings import settings
from dspy.utils.callback import BaseCallback

Expand Down Expand Up @@ -237,14 +237,14 @@ def thread_function_wrapper():

return job

def reinforce(self, train_kwargs) -> ReinforceJob:
def reinforce(self, train_kwargs, gpu_config: MultiGPUConfig = MultiGPUConfig(num_inference_gpus=1, num_training_gpus=1)) -> ReinforceJob:
# TODO(GRPO Team): Should we return an initialized job here?
from dspy import settings as settings

err = f"Provider {self.provider} does not implement the reinforcement learning interface."
assert self.provider.reinforceable, err

job = self.provider.ReinforceJob(lm=self, train_kwargs=train_kwargs)
job = self.provider.ReinforceJob(lm=self, train_kwargs=train_kwargs, gpu_config=gpu_config)
job.initialize()
return job

Expand Down
41 changes: 25 additions & 16 deletions dspy/clients/lm_local_arbor.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import time
from datetime import datetime
from typing import TYPE_CHECKING, Any, TypedDict
from urllib.parse import urljoin

import openai
import requests

import dspy
from dspy.clients.provider import Provider, ReinforceJob, TrainingJob
from dspy.clients.utils_finetune import GRPOGroup, TrainDataFormat, TrainingStatus, save_data
from dspy.clients.utils_finetune import GRPOGroup, MultiGPUConfig, TrainDataFormat, TrainingStatus, save_data

if TYPE_CHECKING:
from dspy.clients.lm import LM
Expand Down Expand Up @@ -70,7 +71,7 @@ class ArborReinforceJob(ReinforceJob):
"lora": False,
}

def __init__(self, lm: "LM", train_kwargs: GRPOTrainKwargs):
def __init__(self, lm: "LM", train_kwargs: GRPOTrainKwargs, gpu_config: MultiGPUConfig = MultiGPUConfig(num_inference_gpus=1, num_training_gpus=1)):
# The teleprompter must ensure that this is set
if "num_generations" not in train_kwargs:
raise ValueError("num_generations must be set in the training kwargs")
Expand All @@ -80,6 +81,7 @@ def __init__(self, lm: "LM", train_kwargs: GRPOTrainKwargs):
self.provider_job_id = None
self.checkpoints = {}
self.last_checkpoint = None
self.gpu_config = gpu_config

def initialize(self):
# TODO(GRPO Team): Set provider job ID
Expand Down Expand Up @@ -118,6 +120,8 @@ def initialize(self):
api_base = self.lm.kwargs["api_base"]

finetune_model = ArborProvider._remove_provider_prefix(self.lm.model)
# Only multi-GPU is supported for now
gpu_config_type = "multi"
data = {
"model": finetune_model,
"num_generations": num_generations,
Expand All @@ -140,8 +144,12 @@ def initialize(self):
"logging_steps": logging_steps,
"max_context_length": max_context_length,
"lora": lora,
"gpu_config": {
"type": gpu_config_type,
gpu_config_type: self.gpu_config,
},
}
url = f"{api_base}fine_tuning/grpo/initialize"
url = urljoin(api_base, "fine_tuning/grpo/initialize")
headers = {"Content-Type": "application/json"}
response = requests.post(url=url, headers=headers, json=data)
assert response.status_code == 200, f"Failed to initialize GRPO: {response}"
Expand All @@ -158,7 +166,7 @@ def _run_grpo_step_one_group(

finetune_model = ArborProvider._remove_provider_prefix(self.lm.model)
data = {"job_id": self.provider_job_id, "model": finetune_model, "batch": train_group}
url = f"{api_base}fine_tuning/grpo/step"
url = urljoin(api_base, f"fine_tuning/grpo/{self.provider_job_id}/step")
headers = {"Content-Type": "application/json"}
response = requests.post(url, headers=headers, json=data)
assert response.status_code == 200, f"Failed to run a GRPO step: {response.text}"
Expand All @@ -184,7 +192,7 @@ def step(self, train_data: list[GRPOGroup], train_data_format: TrainDataFormat |

def save_checkpoint(self, checkpoint_name: str, score: float | None = None):
api_base = self.lm.kwargs["api_base"]
url = f"{api_base}fine_tuning/grpo/checkpoint"
url = urljoin(api_base, f"fine_tuning/grpo/{self.provider_job_id}/checkpoint")
headers = {"Content-Type": "application/json"}
body = {"job_id": self.provider_job_id, "checkpoint_name": checkpoint_name}
response = requests.post(url, headers=headers, json=body)
Expand All @@ -203,7 +211,7 @@ def save_checkpoint(self, checkpoint_name: str, score: float | None = None):
def terminate(self):
api_base = self.lm.kwargs["api_base"]

url = f"{api_base}fine_tuning/grpo/terminate"
url = urljoin(api_base, f"fine_tuning/grpo/{self.provider_job_id}/terminate")
headers = {"Content-Type": "application/json"}
body = {"job_id": self.provider_job_id}
response = requests.post(url, headers=headers, json=body)
Expand All @@ -214,14 +222,15 @@ def terminate(self):
self.lm.model = ArborProvider._add_provider_prefix(current_model)

def cancel(self):
if ArborProvider.does_job_exist(self.provider_job_id):
status = self.status()
if ArborProvider.is_terminal_training_status(status):
err_msg = "Jobs that are complete cannot be canceled."
err_msg += f" Job with ID {self.provider_job_id} is done."
raise Exception(err_msg)
openai.fine_tuning.jobs.cancel(self.provider_job_id)
self.provider_job_id = None
if self.provider_job_id:
api_base = self.lm.kwargs["api_base"]
url = urljoin(api_base, f"fine_tuning/grpo/{self.provider_job_id}/cancel")
headers = {"Content-Type": "application/json"}
response = requests.post(url, headers=headers)
if response.status_code == 200:
self.provider_job_id = None
else:
raise Exception(f"Failed to cancel GRPO job: {response.text}")

def status(self) -> TrainingStatus:
status = ArborProvider.get_training_status(self.provider_job_id)
Expand All @@ -245,7 +254,7 @@ def launch(lm: "LM", launch_kwargs: dict[str, Any] | None = None):
launch_kwargs = launch_kwargs or lm.launch_kwargs

# Make request to launch endpoint
response = requests.post(f"{api_base}chat/launch", json={"model": model, "launch_kwargs": launch_kwargs})
response = requests.post(urljoin(api_base, "chat/launch"), json={"model": model, "launch_kwargs": launch_kwargs})

if response.status_code != 200:
raise Exception(f"Failed to launch model. Status code: {response.status_code}, Response: {response.text}")
Expand All @@ -257,7 +266,7 @@ def kill(lm: "LM", launch_kwargs: dict[str, Any] | None = None):
api_base = lm.kwargs["api_base"]

response = requests.post(
f"{api_base}chat/kill",
urljoin(api_base, "chat/kill"),
)

if response.status_code != 200:
Expand Down
7 changes: 5 additions & 2 deletions dspy/clients/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from threading import Thread
from typing import TYPE_CHECKING, Any

from dspy.clients.utils_finetune import TrainDataFormat
from dspy.clients.utils_finetune import MultiGPUConfig, TrainDataFormat

if TYPE_CHECKING:
from dspy.clients.lm import LM
Expand Down Expand Up @@ -36,11 +36,14 @@ def status(self):


class ReinforceJob:
def __init__(self, lm: "LM", train_kwargs: dict[str, Any] | None = None):
def __init__(self, lm: "LM", train_kwargs: dict[str, Any] | None = None, gpu_config: MultiGPUConfig = MultiGPUConfig(num_inference_gpus=1, num_training_gpus=1)):
self.lm = lm
self.train_kwargs = train_kwargs or {}
self.gpu_config = gpu_config
self.checkpoints = {}
self.last_checkpoint = None
self.gpu_config = gpu_config


@abstractmethod
def initialize(self):
Expand Down
7 changes: 7 additions & 0 deletions dspy/clients/utils_finetune.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ class GRPOChatData(TypedDict):
GRPOGroup = list[GRPOChatData]


class MultiGPUConfig(TypedDict):
# Number of GPUs to use for inference
num_inference_gpus: int
# Number of GPUs to use for training
num_training_gpus: int


def infer_data_format(adapter: Adapter) -> str:
if isinstance(adapter, dspy.ChatAdapter):
return TrainDataFormat.CHAT
Expand Down
6 changes: 4 additions & 2 deletions dspy/teleprompt/grpo.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from dspy.adapters.base import Adapter
from dspy.adapters.chat_adapter import ChatAdapter
from dspy.clients.lm import LM
from dspy.clients.utils_finetune import GRPOGroup, TrainDataFormat
from dspy.clients.utils_finetune import GRPOGroup, MultiGPUConfig, TrainDataFormat
from dspy.dsp.utils.settings import settings
from dspy.evaluate.evaluate import Evaluate
from dspy.primitives.example import Example
Expand Down Expand Up @@ -41,6 +41,7 @@ def __init__(
format_failure_score: float = -1,
variably_invoked_predictor_grouping_mode: Literal["truncate"] | Literal["fill"] | Literal["ragged"] = "truncate",
variably_invoked_predictor_fill_strategy: Literal["randint"] | Literal["max"] | None = None,
gpu_config: MultiGPUConfig = MultiGPUConfig(num_inference_gpus=1, num_training_gpus=1),
):
super().__init__(train_kwargs=train_kwargs)
self.metric = metric
Expand All @@ -57,6 +58,7 @@ def __init__(
self.report_train_scores = report_train_scores
self.failure_score = failure_score
self.format_failure_score = format_failure_score
self.gpu_config = gpu_config

assert failure_score > format_failure_score, "failure_score must be greater than format_failure_score since the range [format_failure_score, failure_score] is used to provide dspy formatting rewards"

Expand Down Expand Up @@ -332,7 +334,7 @@ def compile(
job_key = (pred.lm, data_key)
if job_key not in grpo_training_jobs:
train_kwargs = self.train_kwargs[pred.lm]
job = pred.lm.reinforce(train_kwargs=train_kwargs)
job = pred.lm.reinforce(train_kwargs=train_kwargs, gpu_config=self.gpu_config)
grpo_training_jobs[job_key] = job

self.report_validation_metrics(
Expand Down