diff --git a/rock/sdk/sandbox/agent/base.py b/rock/sdk/sandbox/agent/base.py index 01a5cfec..1d0e1cff 100644 --- a/rock/sdk/sandbox/agent/base.py +++ b/rock/sdk/sandbox/agent/base.py @@ -1,8 +1,10 @@ from __future__ import annotations # Postpone annotation evaluation to avoid circular imports. import asyncio +import os import shlex import time +import uuid from abc import ABC, abstractmethod from typing import TYPE_CHECKING @@ -11,7 +13,7 @@ from rock.actions import CreateBashSessionRequest, Observation from rock.actions.sandbox.base import AbstractSandbox from rock.logger import init_logger -from rock.sdk.sandbox.agent.config import AgentBashCommand, DefaultAgentConfig +from rock.sdk.sandbox.agent.config import AgentBashCommand, BaseAgentConfig from rock.sdk.sandbox.model_service.base import ModelService if TYPE_CHECKING: @@ -34,22 +36,15 @@ async def run(self, **kwargs): pass -class DefaultAgent(Agent): +class BaseAgent(Agent): """Base agent class with common initialization and execution logic. - Provides shared functionality for: - - Session management (create, setup) - - Pre/post startup command execution - - ModelService initialization - - Common error handling and logging - - Nohup process execution - Subclasses must implement: - - _install() - specific installation logic - - run() - specific execution logic + - install() + - create_agent_run_cmd(prompt) """ - def __init__(self, sandbox: Sandbox, config: DefaultAgentConfig): + def __init__(self, sandbox: Sandbox, config: BaseAgentConfig): super().__init__(sandbox) self._sandbox = sandbox @@ -61,14 +56,11 @@ def __init__(self, sandbox: Sandbox, config: DefaultAgentConfig): async def init(self): """Initialize the agent environment. - Common flow: - 1. Setup bash session - 2. Execute pre-init commands - 3. Install agent-specific dependencies (via _install) + Flow: + 1. Execute pre-init commands + 2. Setup bash session + 3. Parallel: agent-specific install + ModelService install (if configured) 4. Execute post-init commands - 5. Initialize ModelService if configured - - All installation and post-startup tasks run in parallel with ModelService init. """ sandbox_id = self._sandbox.sandbox_id start_time = time.time() @@ -81,8 +73,10 @@ async def init(self): await self._setup_session() + await self._provision_local_workdir() + # Parallel tasks: agent-specific install + ModelService init - tasks = [self._install()] + tasks = [self.install()] if self.config.model_service_config: tasks.append(self._init_model_service()) @@ -102,19 +96,82 @@ async def init(self): ) raise + async def _provision_local_workdir(self) -> None: + """If config.local_workdir is set, upload it into sandbox /tmp/. + + Assumes sandbox.upload_dir(source_dir, target_dir) exists. + """ + local_workdir = self.config.local_workdir + if not local_workdir: + return + + sandbox_id = self._sandbox.sandbox_id + local_abs = os.path.abspath(local_workdir) + + if not os.path.exists(local_abs): + raise FileNotFoundError(f"config.local_workdir not found: {local_abs}") + if not os.path.isdir(local_abs): + raise ValueError(f"config.local_workdir must be a directory: {local_abs}") + + target_dir = f"/tmp/rock_workdir_{uuid.uuid4().hex}" + + logger.info(f"[{sandbox_id}] Provisioning local_workdir: {local_abs} -> {target_dir}") + + # Clean & create + await self._sandbox.arun( + cmd=f"rm -rf {shlex.quote(target_dir)} && mkdir -p {shlex.quote(target_dir)}", + session=self.agent_session, + ) + + await self._sandbox.process.upload_dir(source_dir=local_abs, target_dir=target_dir) + + self._sandbox_workdir = target_dir + logger.info(f"[{sandbox_id}] sandbox_workdir ready: {target_dir}") + + def get_sandbox_workdir(self) -> str: + """Return sandbox workdir provisioned from config.local_workdir. + + Raises if local_workdir was not configured or provisioning not completed. + """ + if not self._sandbox_workdir: + raise RuntimeError( + "sandbox_workdir is not available. " + "Set config.local_workdir and ensure agent.init() completed successfully." + ) + return self._sandbox_workdir + + async def run( + self, + prompt: str, + ) -> Observation: + """Unified agent run entry. + + Notes: + - BaseAgent only wraps the command with: bash -c . + - Subclass is responsible for composing the full command content + (including `cd ... && ...` if needed). + - BaseAgent does NOT start ModelService; upper layer should call `start_model_service()` if needed. + """ + cmd = await self.create_agent_run_cmd(prompt) + wrapped_cmd = f"bash -c {shlex.quote(cmd)}" + return await self._agent_run( + cmd=wrapped_cmd, + session=self.agent_session, + ) + @abstractmethod - async def _install(self): - """Install agent-specific dependencies and tools. + async def install(self): + """Install agent-specific dependencies and tools.""" + raise NotImplementedError - This method must be implemented by subclasses to handle: - - Package installation (npm, pip, etc.) - - Tool setup and configuration - - Environment preparation + @abstractmethod + async def create_agent_run_cmd(self, prompt: str) -> str: + """Create the command string for this agent run. - Raises: - Exception: If installation fails + Subclass should return a *shell command string* (NOT wrapped by `bash -c`). + If a working directory is needed, subclass should include `cd ... && ...` in the returned string. """ - pass + raise NotImplementedError async def _setup_session(self): """Create and configure the bash session for agent operations.""" @@ -223,13 +280,7 @@ async def _init_model_service(self): logger.error(f"[{sandbox_id}] ModelService initialization failed: {str(e)}", exc_info=True) raise - async def _agent_run( - self, - cmd: str, - session: str, - wait_timeout: int, - wait_interval: int, - ) -> Observation: + async def _agent_run(self, cmd: str, session: str) -> Observation: """Execute agent command in nohup mode with optional ModelService watch. Args: @@ -275,7 +326,10 @@ async def _agent_run( # Wait for agent process to complete logger.debug(f"[{sandbox_id}] Waiting for agent process completion (pid={pid})") success, message = await self._sandbox.wait_for_process_completion( - pid=pid, session=session, wait_timeout=wait_timeout, wait_interval=wait_interval + pid=pid, + session=session, + wait_timeout=self.config.agent_run_timeout, + wait_interval=self.config.agent_run_check_interval, ) # Handle nohup output and return result diff --git a/rock/sdk/sandbox/agent/config.py b/rock/sdk/sandbox/agent/config.py index 04fd4475..fc8a316f 100644 --- a/rock/sdk/sandbox/agent/config.py +++ b/rock/sdk/sandbox/agent/config.py @@ -1,3 +1,5 @@ +import time + from pydantic import BaseModel, Field from rock import env_vars @@ -16,20 +18,24 @@ class AgentBashCommand(BaseModel): timeout_seconds: int = Field(default=300, description="Timeout in seconds for command execution") -class DefaultAgentConfig(AgentConfig): +class BaseAgentConfig(AgentConfig): """Base configuration for all sandbox agents. Provides common configuration fields shared across different agent types. """ + # Unified runtime identifiers (moved from run() args into config) + agent_installed_dir: str = "/installed_agent" + instance_id: str = f"instance-id-{int(time.time())}" + # Session management - agent_session: str = "default-agent-session" + agent_session: str = f"agent-session-{int(time.time())}" + workdir: str = "./" - # Startup/shutdown commands - unified as RunCommand + # Startup/shutdown commands pre_init_bash_cmd_list: list[AgentBashCommand] = [ AgentBashCommand(**agent_bash_cmd) for agent_bash_cmd in env_vars.ROCK_AGENT_PRE_INIT_BASH_CMD_LIST ] - post_init_bash_cmd_list: list[AgentBashCommand] = Field(default_factory=list) # Environment variables for the session @@ -37,3 +43,13 @@ class DefaultAgentConfig(AgentConfig): # Optional ModelService configuration model_service_config: ModelServiceConfig | None = None + + runtime_env_prepare_timeout: int = 300 # seconds + + agent_install_timeout: int = 600 # seconds + + agent_run_timeout: int = 1800 # seconds + + agent_run_check_interval: int = 30 # seconds + + local_workdir: str | None = None # if set, upload local_workdir to sandbox /tmp/ diff --git a/rock/sdk/sandbox/agent/iflow_cli.py b/rock/sdk/sandbox/agent/iflow_cli.py index cf285b84..d08dbfc6 100644 --- a/rock/sdk/sandbox/agent/iflow_cli.py +++ b/rock/sdk/sandbox/agent/iflow_cli.py @@ -9,13 +9,14 @@ from contextlib import contextmanager from typing import TYPE_CHECKING, Any +from typing_extensions import override + from rock import env_vars -from rock.actions import Observation, UploadRequest +from rock.actions import UploadRequest from rock.logger import init_logger -from rock.sdk.sandbox.agent.base import DefaultAgent -from rock.sdk.sandbox.agent.config import DefaultAgentConfig -from rock.sdk.sandbox.client import Sandbox -from rock.sdk.sandbox.utils import arun_with_retry +from rock.sdk.sandbox.agent.base import BaseAgent +from rock.sdk.sandbox.agent.config import BaseAgentConfig +from rock.sdk.sandbox.agent.runtime_env import NodeAgentRuntimeEnv if TYPE_CHECKING: from rock.sdk.sandbox.client import Sandbox @@ -23,7 +24,6 @@ logger = init_logger(__name__) -# Default IFlow settings DEFAULT_IFLOW_SETTINGS: dict[str, Any] = { "selectedAuthType": "openai-compatible", "apiKey": "", @@ -55,24 +55,20 @@ } -class IFlowCliConfig(DefaultAgentConfig): - """IFlow CLI Agent Configuration. - - Inherits common agent configuration and adds IFlow-specific settings. - """ +class IFlowCliConfig(BaseAgentConfig): + """IFlow CLI Agent Configuration.""" agent_type: str = "iflow-cli" - agent_session: str = "iflow-cli-session" - + # Node runtime install npm_install_cmd: str = env_vars.ROCK_AGENT_NPM_INSTALL_CMD - npm_install_timeout: int = 300 - + # iflow-cli install iflow_cli_install_cmd: str = env_vars.ROCK_AGENT_IFLOW_CLI_INSTALL_CMD iflow_settings: dict[str, Any] = DEFAULT_IFLOW_SETTINGS + # NOTE: keep same template; _create_agent_run_cmd will fill session_id/prompt/log_file iflow_run_cmd: str = "iflow -r {session_id} -p {problem_statement} --yolo > {iflow_log_file} 2>&1" iflow_log_file: str = "~/.iflow/session_info.log" @@ -83,33 +79,32 @@ class IFlowCliConfig(DefaultAgentConfig): } -class IFlowCli(DefaultAgent): - """IFlow CLI Agent implementation. - - Manages the lifecycle of IFlow CLI including installation, configuration, - and execution. Supports session resumption for continuing previous work. - """ +class IFlowCli(BaseAgent): + """IFlow CLI Agent implementation with NodeAgentRuntimeEnv.""" def __init__(self, sandbox: Sandbox, config: IFlowCliConfig): - """Initialize IFlow CLI agent. - - Args: - sandbox: Sandbox instance for executing commands - config: IFlowCliConfig instance with agent settings - """ super().__init__(sandbox, config) - self.config: IFlowCliConfig = config - async def _install(self): + # runtime env maintains its own session + self.agent_runtime_env = NodeAgentRuntimeEnv( + sandbox=self._sandbox, + workdir=self.config.agent_installed_dir, + session=self.agent_session, + install_cmd=self.config.npm_install_cmd, + prepare_timeout=self.config.runtime_env_prepare_timeout, + ) + + @override + async def install(self): """Install IFlow CLI and configure the environment. Steps: - 1. Install npm with retry + 1. Prepare Node runtime (npm/node) 2. Configure npm registry - 3. Install iflow-cli with retry + 3. Install iflow-cli 4. Create iflow configuration directories - 5. Generate and upload settings configuration file + 5. Upload settings configuration file """ sandbox_id = self._sandbox.sandbox_id start_time = time.time() @@ -117,19 +112,19 @@ async def _install(self): logger.info(f"[{sandbox_id}] Starting IFlow CLI installation") try: - # Step 1: Install npm - await self._install_npm() + # Step 1: Node runtime + await self._prepare_node_runtime() - # Step 2: Configure npm registry + # Step 2: npm registry await self._configure_npm_registry() - # Step 3: Install iflow-cli + # Step 3: iflow-cli await self._install_iflow_cli_package() - # Step 4: Create configuration directories + # Step 4: config dirs await self._create_iflow_directories() - # Step 5: Upload settings configuration + # Step 5: upload settings await self._upload_iflow_settings() elapsed = time.time() - start_time @@ -143,34 +138,24 @@ async def _install(self): ) raise - async def _install_npm(self): - """Install npm with Node.js binary.""" - sandbox_id = self._sandbox.sandbox_id + async def _prepare_node_runtime(self): step_start = time.time() - self._log_step("Installing npm") - - logger.debug(f"[{sandbox_id}] NPM install command: {self.config.npm_install_cmd[:100]}...") - - await arun_with_retry( - sandbox=self._sandbox, - cmd=f"bash -c {shlex.quote(self.config.npm_install_cmd)}", - session=self.agent_session, - mode="nohup", - wait_timeout=self.config.npm_install_timeout, - error_msg="npm installation failed", - ) + self._log_step("Preparing Node runtime env (npm/node)", step_name="Node Runtime") + await self.agent_runtime_env.prepare() elapsed_step = time.time() - step_start - self._log_step("NPM installation finished", step_name="NPM Install", is_complete=True, elapsed=elapsed_step) + self._log_step("Node runtime env prepared", step_name="Node Runtime", is_complete=True, elapsed=elapsed_step) async def _configure_npm_registry(self): """Configure npm to use mirror registry for faster downloads.""" sandbox_id = self._sandbox.sandbox_id step_start = time.time() - self._log_step("Configuring npm registry") + self._log_step("Configuring npm registry", step_name="NPM Registry") + # registry config doesn't strictly need runtime env, but running under node_env session is fine. + await self.agent_runtime_env.ensure_session() result = await self._sandbox.arun( cmd="npm config set registry https://registry.npmmirror.com", session=self.agent_session, @@ -185,20 +170,16 @@ async def _configure_npm_registry(self): self._log_step("NPM registry configured", step_name="NPM Registry", is_complete=True, elapsed=elapsed_step) async def _install_iflow_cli_package(self): - """Install iflow-cli package globally.""" sandbox_id = self._sandbox.sandbox_id step_start = time.time() - self._log_step("Installing iflow-cli") - + self._log_step("Installing iflow-cli", step_name="IFlow Install") logger.debug(f"[{sandbox_id}] IFlow CLI install command: {self.config.iflow_cli_install_cmd[:100]}...") - await arun_with_retry( - sandbox=self._sandbox, - cmd=f"bash -c {shlex.quote(self.config.iflow_cli_install_cmd)}", - session=self.agent_session, - mode="nohup", - wait_timeout=self.config.npm_install_timeout, + # Use node runtime env to run install cmd (wrap is currently bash -c, but uses node_env session) + await self.agent_runtime_env.run( + cmd=self.config.iflow_cli_install_cmd, + wait_timeout=self.config.agent_install_timeout, error_msg="iflow-cli installation failed", ) @@ -208,11 +189,10 @@ async def _install_iflow_cli_package(self): ) async def _create_iflow_directories(self): - """Create iflow configuration directories.""" sandbox_id = self._sandbox.sandbox_id step_start = time.time() - self._log_step("Creating iflow settings directories") + self._log_step("Creating iflow settings directories", step_name="Create Directories") result = await self._sandbox.arun( cmd="mkdir -p /root/.iflow && mkdir -p ~/.iflow", @@ -224,8 +204,6 @@ async def _create_iflow_directories(self): logger.error(f"[{sandbox_id}] {error_msg}") raise Exception(error_msg) - logger.debug(f"[{sandbox_id}] IFlow settings directories created") - elapsed_step = time.time() - step_start self._log_step( "IFlow configuration directories created", @@ -235,11 +213,10 @@ async def _create_iflow_directories(self): ) async def _upload_iflow_settings(self): - """Generate and upload iflow-settings.json configuration file.""" sandbox_id = self._sandbox.sandbox_id step_start = time.time() - self._log_step("Generating and uploading iflow settings") + self._log_step("Generating and uploading iflow settings", step_name="Upload Settings") with self._temp_iflow_settings_file() as temp_settings_path: await self._sandbox.upload( @@ -252,19 +229,14 @@ async def _upload_iflow_settings(self): elapsed_step = time.time() - step_start self._log_step( - "IFlow settings configuration uploaded", step_name="Upload Settings", is_complete=True, elapsed=elapsed_step + "IFlow settings configuration uploaded", + step_name="Upload Settings", + is_complete=True, + elapsed=elapsed_step, ) @contextmanager def _temp_iflow_settings_file(self): - """Context manager for creating temporary iflow settings file. - - Creates a temporary JSON file with the configured IFlow settings - and ensures cleanup after use. - - Yields: - str: Path to the temporary settings file - """ settings_content = json.dumps(self.config.iflow_settings, indent=2) with tempfile.NamedTemporaryFile(mode="w", suffix="_iflow_settings.json", delete=False) as temp_file: @@ -277,69 +249,41 @@ def _temp_iflow_settings_file(self): os.unlink(temp_settings_path) async def _get_session_id_from_sandbox(self) -> str: - """Retrieve session ID from IFlow log file in sandbox. - - Fetches the last 1000 lines of the log file and extracts the session ID. - Returns empty string if log file is empty, not found, or parsing fails. - - Returns: - Session ID string if found, empty string otherwise - """ sandbox_id = self._sandbox.sandbox_id logger.info(f"[{sandbox_id}] Retrieving session ID from sandbox log file") try: log_file_path = self.config.iflow_log_file - logger.debug(f"[{sandbox_id}] Reading log file: {log_file_path}") - result = await self._sandbox.arun( cmd=f"tail -1000 {log_file_path} 2>/dev/null || echo ''", session=self.agent_session, ) log_content = result.output.strip() - if not log_content: - logger.debug(f"[{sandbox_id}] Log file is empty or not found") return "" - logger.debug(f"[{sandbox_id}] Retrieved log content ({len(log_content)} bytes)") - session_id = self._extract_session_id_from_log(log_content) - return session_id + return self._extract_session_id_from_log(log_content) except Exception as e: logger.error(f"[{sandbox_id}] Error retrieving session ID: {str(e)}") return "" def _extract_session_id_from_log(self, log_content: str) -> str: - """Extract session ID from IFlow log file content. - - Args: - log_content: Content from the log file - - Returns: - Session ID string if found, empty string otherwise - """ sandbox_id = self._sandbox.sandbox_id logger.debug(f"[{sandbox_id}] Attempting to extract session-id from log content") try: json_match = re.search(r"\s*(.*?)\s*", log_content, re.DOTALL) - if not json_match: - logger.debug(f"[{sandbox_id}] No block found in log") return "" json_str = json_match.group(1).strip() data = json.loads(json_str) session_id = data.get("session-id", "") - if session_id: logger.info(f"[{sandbox_id}] Successfully extracted session-id: {session_id}") - return session_id - else: - logger.debug(f"[{sandbox_id}] session-id field not found in Execution Info") - return "" + return session_id or "" except json.JSONDecodeError as e: logger.warning(f"[{sandbox_id}] Failed to parse JSON in Execution Info: {str(e)}") @@ -348,101 +292,21 @@ def _extract_session_id_from_log(self, log_content: str) -> str: logger.warning(f"[{sandbox_id}] Error extracting session-id: {str(e)}") return "" - async def run( - self, - problem_statement: str, - project_path: str, - agent_run_timeout: int = 1800, - agent_run_check_interval: int = 30, - ) -> Observation: - """Run IFlow CLI to solve a specified problem. - - Automatically attempts to retrieve the previous session ID from the log file. - If a session ID is found, it will be used to resume the previous execution. - - Args: - problem_statement: Problem statement that IFlow CLI will attempt to solve - project_path: Project path to work on - agent_run_timeout: Agent execution timeout in seconds (default: 1800) - agent_run_check_interval: Interval for checking progress in seconds (default: 30) - - Returns: - Observation: Execution result with exit code and output - """ + @override + async def create_agent_run_cmd(self, prompt: str) -> str: + """Create IFlow run command (NOT wrapped by bash -c).""" sandbox_id = self._sandbox.sandbox_id - start_time = time.time() - - logger.info(f"[{sandbox_id}] Starting IFlow CLI run operation") - logger.debug(f"[{sandbox_id}] Project path: {project_path}, Problem statement: {problem_statement[:100]}...") - - try: - # Step 1: Change to project directory - self._log_step(f"Changing to project directory: {project_path}", step_name="CD Project") - result = await self._sandbox.arun( - cmd=f"cd {project_path}", - session=self.agent_session, - ) - - if result.exit_code != 0: - logger.error(f"[{sandbox_id}] Failed to change directory to {project_path}: {result.output}") - return result - logger.debug(f"[{sandbox_id}] Successfully changed working directory") - - # Step 2: Retrieve session ID from previous execution - logger.info(f"[{sandbox_id}] Attempting to retrieve session ID from previous execution") - session_id = await self._get_session_id_from_sandbox() - if session_id: - logger.info(f"[{sandbox_id}] Using existing session ID: {session_id}") - else: - logger.info(f"[{sandbox_id}] No previous session found, will start fresh execution") - - # Step 3: Execute IFlow CLI command - self._log_step( - f"Running IFlow CLI with timeout {agent_run_timeout}s", - step_name="IFlow Execution", - ) - - iflow_run_cmd = self.config.iflow_run_cmd.format( - session_id=f'"{session_id}"', - problem_statement=shlex.quote(problem_statement), - iflow_log_file=self.config.iflow_log_file, - ) - logger.debug(f"[{sandbox_id}] Formatted IFlow command: {iflow_run_cmd}") - result = await self._agent_run( - cmd=f"bash -c {shlex.quote(iflow_run_cmd)}", - session=self.agent_session, - wait_timeout=agent_run_timeout, - wait_interval=agent_run_check_interval, - ) - - # Step 4: Log execution outcome - log_file_path = self.config.iflow_log_file - result_log = await self._sandbox.arun( - cmd=f"tail -1000 {log_file_path} 2>/dev/null || echo ''", - session=self.agent_session, - ) - log_content = result_log.output - - elapsed_total = time.time() - start_time - - if result and result.exit_code == 0: - logger.info( - f"[{sandbox_id}] ✓ IFlow-Cli completed successfully " - f"(exit_code: {result.exit_code}, elapsed: {elapsed_total:.2f}s)" - ) - logger.debug(f"[{sandbox_id}] Output: {log_content}") - else: - error_msg = result.failure_reason if result else "No result returned" - logger.error(f"[{sandbox_id}] ✗ IFlow-Cli failed - {error_msg} (elapsed: {elapsed_total:.2f}s)") - logger.error(f"[{sandbox_id}] Output: {log_content}") + session_id = await self._get_session_id_from_sandbox() + if session_id: + logger.info(f"[{sandbox_id}] Using existing session ID: {session_id}") + else: + logger.info(f"[{sandbox_id}] No previous session found, will start fresh execution") - return result + iflow_run_cmd = self.config.iflow_run_cmd.format( + session_id=f'"{session_id}"', + problem_statement=shlex.quote(prompt), + iflow_log_file=self.config.iflow_log_file, + ) - except Exception as e: - elapsed_total = time.time() - start_time - logger.error( - f"[{sandbox_id}] IFlow CLI execution failed - {str(e)} (elapsed: {elapsed_total:.2f}s)", - exc_info=True, - ) - raise + return f"cd {shlex.quote(self.config.workdir)} && {iflow_run_cmd}" diff --git a/rock/sdk/sandbox/agent/openhands.py b/rock/sdk/sandbox/agent/openhands.py index 88500da8..68f9ee6c 100644 --- a/rock/sdk/sandbox/agent/openhands.py +++ b/rock/sdk/sandbox/agent/openhands.py @@ -1,27 +1,30 @@ """ +TODO: refactor openhands work like swe-agent Solving software engineering(SWE) problem with [Openhands Benchmarks SDK](https://github.com/OpenHands/benchmarks.git). Implementation framework reference: `rock/sdk/sandbox/agent/swe_agent.py` """ + from __future__ import annotations -import os -import time -import json import copy +import json +import os import shlex import tempfile -from pathlib import Path +import time from contextlib import contextmanager +from pathlib import Path +from typing import Any, Literal + +from typing_extensions import override from rock import env_vars +from rock.actions import Observation, UploadRequest, WriteFileRequest from rock.logger import init_logger +from rock.sdk.sandbox.agent.base import BaseAgent +from rock.sdk.sandbox.agent.config import BaseAgentConfig from rock.sdk.sandbox.client import Sandbox from rock.sdk.sandbox.utils import arun_with_retry -from rock.sdk.sandbox.agent.base import DefaultAgent -from rock.sdk.sandbox.agent.config import DefaultAgentConfig -from rock.actions import UploadRequest, Observation, WriteFileRequest - -from typing import Any, Literal logger = init_logger(__name__) @@ -47,7 +50,7 @@ 1.5 Hightlight any best practices to take into account when testing and fixing the issue Phase 2. RUNNING: install and run the tests on the repository - 2.1 Activate the environment by running + 2.1 Activate the environment by running ./opt/miniconda3/etc/profile.d/conda.sh ; conda activate testbed 2.2 Follow the readme 2.3 Install the environment and anything needed @@ -129,11 +132,11 @@ "custom_tokenizer": None, "native_tool_calling": True, "extended_thinking_budget": 200000, - } + }, } -class OpenhandsConfig(DefaultAgentConfig): +class OpenhandsConfig(BaseAgentConfig): """Configuration dataclass for Openhands initialization and execution. This class defines all configurable parameters for setting up and running @@ -143,7 +146,7 @@ class OpenhandsConfig(DefaultAgentConfig): Attributes: agent_type: Fixed identifier for this agent type ("openhands") agent_session: Name of the bash session used for Openhands execution - agent_workdir: Working directory for agent installation and execution + workdir: Working directory for agent installation and execution python_install_cmd: Command to install Python environment openhands_sdk_install_cmd_list: Commands to clone and install Openhands/benchmarks repository python_install_timeout: Maximum seconds to wait for Python installation @@ -155,10 +158,8 @@ class OpenhandsConfig(DefaultAgentConfig): agent_type: Literal["openhands"] = "openhands" - agent_session: str = "openhands-rollout-session" - # directory where Openhands will be installed - agent_workdir: str = "/openhands" + workdir: str = "/openhands" python_install_cmd: str = env_vars.ROCK_AGENT_PYTHON_v12_INSTALL_CMD @@ -170,23 +171,19 @@ class OpenhandsConfig(DefaultAgentConfig): "rm -rf /openhands/benchmarks", "git clone -b features/local_workspace_fix_early_stop https://github.com/shayue-wt/benchmarks.git /openhands/benchmarks", "/openhands/python/bin/pip install datasets huggingface-hub jinja2 pandas Pillow toml swebench", - "/openhands/python/bin/pip install tqdm 'unidiff>=0.7.5,<0.8.0' 'modal>=1.1.4' commit0 pytest-json-report" + "/openhands/python/bin/pip install tqdm 'unidiff>=0.7.5,<0.8.0' 'modal>=1.1.4' commit0 pytest-json-report", ] python_install_timeout: int = 300 - agent_install_timeout: int = 600 - default_run_single_config: dict[str, Any] = DEFAULT_RUN_SINGLE_CONFIG - session_envs: dict[str, str] = {} - agent_prompt: str = DEFAULT_PROMPT max_iteration: int = 300 -class Openhands(DefaultAgent): +class Openhands(BaseAgent): """ Openhands implementation for automated software engineering tasks. @@ -195,6 +192,7 @@ class Openhands(DefaultAgent): to be compatible with SWE tasks other than SWE-Bench. It orchestrates rollout generation, patch retrieval, and result validation. """ + sandbox: Sandbox config: OpenhandsConfig @@ -207,9 +205,10 @@ def __init__(self, sandbox: Sandbox, config: OpenhandsConfig): """ super().__init__(sandbox, config) - self.agent_prompt_path = f"{self.config.agent_workdir}/benchmarks/benchmarks/swebench/prompts/custom.j2" + self.agent_prompt_path = f"{self.config.workdir}/benchmarks/benchmarks/swebench/prompts/custom.j2" - async def _install(self): + @override + async def install(self): """Install Openhands/benchmarks and configure the environment.""" sandbox_id = self._sandbox.sandbox_id @@ -220,7 +219,7 @@ async def _install(self): try: # Step 1: Create working directory step_start = time.time() - mkdir_cmd = f"mkdir -p {self.config.agent_workdir}" + mkdir_cmd = f"mkdir -p {self.config.workdir}" logger.debug(f"[{sandbox_id}] Command: {mkdir_cmd}") await self._sandbox.arun(cmd=mkdir_cmd, session=self.agent_session) elapsed_step = time.time() - step_start @@ -228,7 +227,7 @@ async def _install(self): # Step 2: Install Python step_start = time.time() - python_install_cmd = f"cd {self.config.agent_workdir} && {self.config.python_install_cmd}" + python_install_cmd = f"cd {self.config.workdir} && {self.config.python_install_cmd}" full_cmd = f"bash -c {shlex.quote(python_install_cmd)}" logger.debug(f"[{sandbox_id}] Command: {full_cmd}") @@ -283,18 +282,14 @@ async def _upload_config(self): if self.config.agent_prompt != DEFAULT_PROMPT: r = await self._sandbox.write_file( - WriteFileRequest( - content=self.config.agent_prompt, - path=self.agent_prompt_path - ) + WriteFileRequest(content=self.config.agent_prompt, path=self.agent_prompt_path) ) assert r.success, f"agent prompt write failed: {r.error}" logger.debug("agent prompt write successfully...") r = await self._sandbox.write_file( WriteFileRequest( - content=json.dumps(config["llm"], indent=4), - path=f"{self.config.agent_workdir}/benchmarks/.llm_config.json" + content=json.dumps(config["llm"], indent=4), path=f"{self.config.workdir}/benchmarks/.llm_config.json" ) ) assert r.success, f"llm configuration write failed: {r.error}" @@ -302,8 +297,7 @@ async def _upload_config(self): @contextmanager def _config_template_context( - self, problem_statement: str, project_path: str, instance_id: str, - repo_name: str, base_commit: str + self, problem_statement: str, project_path: str, instance_id: str, repo_name: str, base_commit: str ): """Context manager for temporary config file generation and cleanup. @@ -356,12 +350,12 @@ def _config_template_context( logger.warning(f"⚠ Could not clean up temporary config file {temp_file_path}: {e}") async def run( - self, - problem_statement: str, - project_path: str, - instance_id: str, - agent_run_timeout: int = 1800, - agent_run_check_interval: int = 30, + self, + problem_statement: str, + project_path: str, + instance_id: str, + agent_run_timeout: int = 1800, + agent_run_check_interval: int = 30, ) -> Observation: """Execute Openhands with the specified problem statement and project path. @@ -399,7 +393,7 @@ async def run( instance_config = Path(generated_config_path).name step_start = time.time() - target_path = f"{self.config.agent_workdir}/benchmarks/{instance_config}" + target_path = f"{self.config.workdir}/benchmarks/{instance_config}" logger.debug( f"[{sandbox_id}] UploadRequest(source_path={os.path.abspath(generated_config_path)}, " f"target_path={target_path})" @@ -419,15 +413,15 @@ async def run( # Execute Openhands step_start = time.time() agent_run_cmd = ( - f"cd {self.config.agent_workdir}/benchmarks && " + f"cd {self.config.workdir}/benchmarks && " "export PYTHONPATH='.' && " - f"{self.config.agent_workdir}/python/bin/python " + f"{self.config.workdir}/python/bin/python " "./benchmarks/swebench/run_infer.py " ".llm_config.json --dataset eval --split test --note rock_rollout " f"--select ./{instance_config} --max-iterations {self.config.max_iteration}" ) if self.config.agent_prompt != DEFAULT_PROMPT: - agent_run_cmd += f" --prompt-path benchmarks/swebench/prompts/custom.j2" + agent_run_cmd += " --prompt-path benchmarks/swebench/prompts/custom.j2" full_cmd = f"bash -c {shlex.quote(agent_run_cmd)}" logger.debug( diff --git a/rock/sdk/sandbox/agent/runtime_env/__init__.py b/rock/sdk/sandbox/agent/runtime_env/__init__.py new file mode 100644 index 00000000..b5383f87 --- /dev/null +++ b/rock/sdk/sandbox/agent/runtime_env/__init__.py @@ -0,0 +1,9 @@ +from rock.sdk.sandbox.agent.runtime_env.base import AgentRuntimeEnv +from rock.sdk.sandbox.agent.runtime_env.node_env import NodeAgentRuntimeEnv +from rock.sdk.sandbox.agent.runtime_env.python_env import PythonAgentRuntimeEnv + +__all__ = [ + "AgentRuntimeEnv", + "PythonAgentRuntimeEnv", + "NodeAgentRuntimeEnv", +] diff --git a/rock/sdk/sandbox/agent/runtime_env/base.py b/rock/sdk/sandbox/agent/runtime_env/base.py new file mode 100644 index 00000000..65f30918 --- /dev/null +++ b/rock/sdk/sandbox/agent/runtime_env/base.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +from rock.actions import CreateBashSessionRequest +from rock.sdk.sandbox.client import RunMode, RunModeType +from rock.sdk.sandbox.utils import arun_with_retry + +if TYPE_CHECKING: + from rock.sdk.sandbox.client import Sandbox + + +class AgentRuntimeEnv(ABC): + """Runtime environment for agents (e.g., Python/Node). + + Key points: + - Runtime env maintains its own bash session. + - Runtime env is responsible for creating that session if missing. + - Runtime env provides wrap/run helpers to execute commands under that runtime context. + """ + + def __init__( + self, + sandbox: Sandbox, + workdir: str, + session: str = "agent-runtime-env-session", + install_cmd: str | None = None, + prepare_timeout: int = 300, + session_envs: dict[str, str] | None = None, + ) -> None: + self._sandbox = sandbox + self.session = session + self.session_envs = session_envs or {} + self._prepared: bool = False + self._session_ready: bool = False + self.workdir = workdir + self.install_cmd = install_cmd + self.prepare_timeout = prepare_timeout + + @property + def prepared(self) -> bool: + return self._prepared + + async def ensure_session(self) -> None: + """Ensure runtime env session exists. Safe to call multiple times.""" + if self._session_ready: + return + + # Try to create; if already exists, sandbox may raise—treat as ok. + try: + await self._sandbox.create_session( + CreateBashSessionRequest( + session=self.session, + env_enable=True, + env=self.session_envs, + ) + ) + except Exception: + pass + + self._session_ready = True + + @abstractmethod + async def prepare(self) -> None: + """Prepare the runtime in the sandbox (install/unpack, validate).""" + raise NotImplementedError + + @abstractmethod + def wrap(self, cmd: str) -> str: + """Wrap a shell command so it runs under this runtime (e.g., inject PATH).""" + raise NotImplementedError + + async def run( + self, + cmd: str, + mode: RunModeType = RunMode.NOHUP, + wait_timeout: int = 600, + error_msg: str = "runtime env command failed", + ): + """Run a command under this runtime via arun_with_retry.""" + await self.ensure_session() + wrapped = self.wrap(cmd) + return await arun_with_retry( + sandbox=self._sandbox, + cmd=wrapped, + session=self.session, + mode=mode, + wait_timeout=wait_timeout, + error_msg=error_msg, + ) diff --git a/rock/sdk/sandbox/agent/runtime_env/node_env.py b/rock/sdk/sandbox/agent/runtime_env/node_env.py new file mode 100644 index 00000000..a551fc86 --- /dev/null +++ b/rock/sdk/sandbox/agent/runtime_env/node_env.py @@ -0,0 +1,85 @@ +from __future__ import annotations + +import shlex +from typing import TYPE_CHECKING + +from rock import env_vars +from rock.logger import init_logger +from rock.sdk.sandbox.agent.runtime_env.base import AgentRuntimeEnv +from rock.sdk.sandbox.utils import arun_with_retry + +if TYPE_CHECKING: + from rock.sdk.sandbox.client import Sandbox + +logger = init_logger(__name__) + + +class NodeAgentRuntimeEnv(AgentRuntimeEnv): + """Node runtime env. + + This is a minimal runtime env: + - prepare(): ensures workdir exists, then runs node_install_cmd in that workdir + - wrap(): currently just runs via bash -c (no fixed layout enforced) + (If later you standardize node to a fixed folder under workdir, you can inject PATH here.) + """ + + def __init__( + self, + sandbox: Sandbox, + workdir: str, + session: str = "node-runtime-env-session", + install_cmd: str | None = None, + prepare_timeout: int = 300, + session_envs: dict[str, str] | None = None, + ) -> None: + super().__init__( + sandbox=sandbox, + workdir=workdir, + session=session, + install_cmd=install_cmd, + prepare_timeout=prepare_timeout, + session_envs=session_envs, + ) + self.node_install_cmd = install_cmd or env_vars.ROCK_AGENT_NPM_INSTALL_CMD + + def wrap(self, cmd: str) -> str: + return f"bash -c {shlex.quote(cmd)}" + + async def prepare(self) -> None: + await self.ensure_session() + + sandbox_id = self._sandbox.sandbox_id + logger.info(f"[{sandbox_id}] Preparing Node runtime env (workdir={self.workdir})") + + # 1) ensure workdir exists + await self._sandbox.arun( + cmd=f"mkdir -p {shlex.quote(self.workdir)}", + session=self.session, + ) + + from rock.sdk.sandbox.client import RunMode + + # 2) install node/npm (script may install globally) + install_cmd = f"cd {shlex.quote(self.workdir)} && {self.node_install_cmd}" + await arun_with_retry( + sandbox=self._sandbox, + cmd=f"bash -c {shlex.quote(install_cmd)}", + session=self.session, + mode=RunMode.NOHUP, + wait_timeout=self.prepare_timeout, + error_msg="Node runtime installation failed", + ) + + # 3) lightweight validation (best-effort) + # Some images may not have node in PATH immediately; treat failure as warning. + res = await self._sandbox.arun( + cmd="command -v node >/dev/null 2>&1 && node --version || true", + session=self.session, + ) + logger.debug(f"[{sandbox_id}] Node validation output: {res.output[:200]}") + + if res.exit_code != 0: + raise RuntimeError("Node runtime validation failed") + + self._prepared = True + logger.info(f"[{sandbox_id}] Node runtime env prepared") diff --git a/rock/sdk/sandbox/agent/runtime_env/python_env.py b/rock/sdk/sandbox/agent/runtime_env/python_env.py new file mode 100644 index 00000000..b714b013 --- /dev/null +++ b/rock/sdk/sandbox/agent/runtime_env/python_env.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import shlex +from typing import TYPE_CHECKING + +from rock import env_vars +from rock.logger import init_logger +from rock.sdk.sandbox.agent.runtime_env.base import AgentRuntimeEnv +from rock.sdk.sandbox.utils import arun_with_retry + +if TYPE_CHECKING: + from rock.sdk.sandbox.client import Sandbox + +logger = init_logger(__name__) + + +class PythonAgentRuntimeEnv(AgentRuntimeEnv): + """Python runtime env. + + Contract: + - workdir/python is the runtime root directory (fixed). + - workdir/python/bin contains python/pip and any console_scripts (e.g. sweagent). + """ + + def __init__( + self, + sandbox: Sandbox, + workdir: str, + session: str = "python-runtime-env-session", + install_cmd: str | None = None, + prepare_timeout: int = 300, + session_envs: dict[str, str] | None = None, + ) -> None: + super().__init__( + sandbox=sandbox, + workdir=workdir, + session=session, + install_cmd=install_cmd, + prepare_timeout=prepare_timeout, + session_envs=session_envs, + ) + self.python_install_cmd = install_cmd or env_vars.ROCK_AGENT_PYTHON_INSTALL_CMD + + @property + def bin_dir(self) -> str: + return f"{self.workdir}/python/bin" + + def wrap(self, cmd: str) -> str: + wrapped = f"export PATH={shlex.quote(self.bin_dir)}:$PATH && {cmd}" + return f"bash -c {shlex.quote(wrapped)}" + + async def prepare(self) -> None: + await self.ensure_session() + + sandbox_id = self._sandbox.sandbox_id + logger.info(f"[{sandbox_id}] Preparing Python runtime env in {self.workdir}") + + # 1) ensure workdir exists + await self._sandbox.arun( + cmd=f"mkdir -p {shlex.quote(self.workdir)}", + session=self.session, + ) + + from rock.sdk.sandbox.client import RunMode + + # 2) run install cmd in workdir (must create ./python) + install_cmd = f"cd {shlex.quote(self.workdir)} && {self.python_install_cmd}" + await arun_with_retry( + sandbox=self._sandbox, + cmd=f"bash -c {shlex.quote(install_cmd)}", + session=self.session, + mode=RunMode.NOHUP, + wait_timeout=self.prepare_timeout, + error_msg="Python runtime installation failed", + ) + + # 3) validate python exists + check_cmd = f"test -x {shlex.quote(self.bin_dir)}/python" + result = await self._sandbox.arun( + cmd=check_cmd, + session=self.session, + ) + if result.exit_code != 0: + raise RuntimeError( + "PythonAgentRuntimeEnv.prepare() failed: " + f"{self.bin_dir}/python not found or not executable. " + "Ensure python_install_cmd installs into ./python under workdir." + ) + + self._prepared = True + logger.info(f"[{sandbox_id}] Python runtime env prepared") diff --git a/rock/sdk/sandbox/agent/swe_agent.py b/rock/sdk/sandbox/agent/swe_agent.py index 4ddeb031..e6fa6be2 100644 --- a/rock/sdk/sandbox/agent/swe_agent.py +++ b/rock/sdk/sandbox/agent/swe_agent.py @@ -6,17 +6,17 @@ import tempfile import time from contextlib import contextmanager -from pathlib import Path from typing import TYPE_CHECKING, Any, Literal import yaml +from typing_extensions import override from rock import env_vars -from rock.actions import Observation, UploadRequest +from rock.actions import UploadRequest from rock.logger import init_logger -from rock.sdk.sandbox.agent.base import DefaultAgent -from rock.sdk.sandbox.agent.config import DefaultAgentConfig -from rock.sdk.sandbox.utils import arun_with_retry +from rock.sdk.sandbox.agent.base import BaseAgent +from rock.sdk.sandbox.agent.config import BaseAgentConfig +from rock.sdk.sandbox.agent.runtime_env.python_env import PythonAgentRuntimeEnv if TYPE_CHECKING: from rock.sdk.sandbox.client import Sandbox @@ -24,9 +24,7 @@ logger = init_logger(__name__) - DEFAULT_SYSTEM_TEMPLATE = "You are a helpful assistant that can interact with a computer to solve tasks." - DEFAULT_INSTANCE_TEMPLATE = """ {{working_dir}} @@ -65,7 +63,6 @@ """ ] -DEFAULT_PARSE_FUNCTION_TYPE = "function_calling" DEFAULT_NEXT_STEP_TEMPLATE = "OBSERVATION:\n{{observation}}" DEFAULT_NEXT_STEP_NO_OUTPUT_TEMPLATE = "Your command ran successfully and did not produce any output." @@ -127,31 +124,11 @@ } -class SweAgentConfig(DefaultAgentConfig): - """Configuration dataclass for SWE-agent initialization and execution. - - Inherits common agent configuration and adds SWE-agent specific settings. - - Attributes: - agent_type: Fixed identifier for this agent type ("swe-agent") - default_run_single_config: Default configuration object for a single run - swe_agent_workdir: Working directory for agent installation and execution - python_install_cmd: Command to install Python environment - swe_agent_install_cmd: Command to clone and install SWE-agent repository - python_install_timeout: Maximum seconds to wait for Python installation - swe_agent_install_timeout: Maximum seconds to wait for SWE-agent installation - agent_run_timeout: Maximum seconds to wait for agent execution completion - agent_run_check_interval: Seconds between status checks during execution - """ +class SweAgentConfig(BaseAgentConfig): + """SWE-agent configuration.""" agent_type: Literal["swe-agent"] = "swe-agent" - agent_session: str = "swe-agent-session" - - post_startup_bash_cmd_list: list[str] = [] - - swe_agent_workdir: str = "/tmp_sweagent" - python_install_cmd: str = env_vars.ROCK_AGENT_PYTHON_INSTALL_CMD swe_agent_install_cmd: str = ( @@ -160,56 +137,36 @@ class SweAgentConfig(DefaultAgentConfig): "cd SWE-agent && pip install -e . -i https://mirrors.aliyun.com/pypi/simple/" ) - python_install_timeout: int = 300 - - swe_agent_install_timeout: int = 600 - default_run_single_config: dict[str, Any] = DEFAULT_RUN_SINGLE_CONFIG - session_envs: dict[str, str] = {} +class SweAgent(BaseAgent): + """SWE-agent implementation (runtime env migrated).""" -class SweAgent(DefaultAgent): - """SWE-agent implementation with integrated ModelService support. - - Manages the complete lifecycle of SWE-agent including environment - initialization, dependency installation, and task execution within - a sandboxed environment. - """ + GENERATED_CONFIG_NAME = "generated_config.yaml" def __init__(self, sandbox: Sandbox, config: SweAgentConfig): - """Initialize SWE-agent with sandbox environment and configuration. - - Args: - sandbox: Sandbox instance for isolated agent execution - config: Configuration parameters for agent setup - """ super().__init__(sandbox, config) - self.config: SweAgentConfig = config - async def _install(self): - """Install SWE-agent and configure the environment. + self.agent_runtime_env = PythonAgentRuntimeEnv( + sandbox=self._sandbox, + workdir=self.config.agent_installed_dir, + install_cmd=self.config.python_install_cmd, + session=self.agent_session, + prepare_timeout=self.config.runtime_env_prepare_timeout, + ) - Steps: - 1. Create working directory - 2. Install Python environment - 3. Clone and install SWE-agent repository - """ + @override + async def install(self): sandbox_id = self._sandbox.sandbox_id start_time = time.time() - logger.info(f"[{sandbox_id}] Starting SWE-agent installation") try: - # Step 1: Create working directory - await self._create_working_directory() - - # Step 2: Install Python - await self._install_python() - - # Step 3: Install SWE-agent + await self.agent_runtime_env.prepare() await self._install_swe_agent_package() + await self._upload_generated_config_template() elapsed = time.time() - start_time logger.info(f"[{sandbox_id}] SWE-agent installation completed (elapsed: {elapsed:.2f}s)") @@ -222,103 +179,75 @@ async def _install(self): ) raise - async def _create_working_directory(self): - """Create working directory for SWE-agent.""" - sandbox_id = self._sandbox.sandbox_id + async def _install_swe_agent_package(self): step_start = time.time() - self._log_step(f"Creating working directory: {self.config.swe_agent_workdir}", step_name="Create Workdir") - - mkdir_cmd = f"mkdir -p {self.config.swe_agent_workdir}" - logger.debug(f"[{sandbox_id}] Command: {mkdir_cmd}") - - await self._sandbox.arun( - cmd=mkdir_cmd, - session=self.agent_session, - ) - - elapsed_step = time.time() - step_start - self._log_step("Working directory created", step_name="Create Workdir", is_complete=True, elapsed=elapsed_step) - - async def _install_python(self): - """Install Python environment.""" - sandbox_id = self._sandbox.sandbox_id - step_start = time.time() + self._log_step("Installing SWE-agent repository", step_name="SWE-agent Install") - self._log_step("Installing Python environment", step_name="Python Install") + if not self.agent_runtime_env.prepared: + raise RuntimeError("Python runtime env is not prepared. Call python_env.prepare() before installation.") - python_install_cmd = f"cd {self.config.swe_agent_workdir} && {self.config.python_install_cmd}" - full_cmd = f"bash -c {shlex.quote(python_install_cmd)}" - logger.debug(f"[{sandbox_id}] Command: {full_cmd}") + swe_agent_install_cmd = f"cd {self.config.agent_installed_dir} && {self.config.swe_agent_install_cmd}" - await arun_with_retry( - sandbox=self._sandbox, - cmd=full_cmd, - session=self.agent_session, - mode="nohup", - wait_timeout=self.config.python_install_timeout, - error_msg="Python installation failed", + await self.agent_runtime_env.run( + cmd=swe_agent_install_cmd, + wait_timeout=self.config.agent_install_timeout, + error_msg="SWE-agent installation failed", ) elapsed_step = time.time() - step_start self._log_step( - "Python environment installed", step_name="Python Install", is_complete=True, elapsed=elapsed_step + "SWE-agent repository installed", + step_name="SWE-agent Install", + is_complete=True, + elapsed=elapsed_step, ) - async def _install_swe_agent_package(self): - """Clone and install SWE-agent repository.""" + async def _upload_generated_config_template(self) -> None: + """Generate and upload a static template config to agent_installed_dir/generated_config.yaml. + + The prompt/problem_statement text will be injected at runtime via CLI args in _create_agent_run_cmd(). + """ sandbox_id = self._sandbox.sandbox_id step_start = time.time() - self._log_step("Installing SWE-agent repository", step_name="SWE-agent Install") + self._log_step("Generating and uploading SWE-agent config template", step_name="Upload Config") - swe_agent_install_cmd = ( - f"export PATH={self.config.swe_agent_workdir}/python/bin:$PATH && " - f"cd {self.config.swe_agent_workdir} && " - f"{self.config.swe_agent_install_cmd}" - ) - full_cmd = f"bash -c {shlex.quote(swe_agent_install_cmd)}" - logger.debug(f"[{sandbox_id}] Command: {full_cmd}") + with self._generated_config_template_context() as local_path: + target_path = f"{self.config.agent_installed_dir}/{self.GENERATED_CONFIG_NAME}" - await arun_with_retry( - sandbox=self._sandbox, - cmd=full_cmd, - session=self.agent_session, - mode="nohup", - wait_timeout=self.config.swe_agent_install_timeout, - error_msg="SWE-agent installation failed", - ) + await self._sandbox.upload( + UploadRequest( + source_path=os.path.abspath(local_path), + target_path=target_path, + ) + ) + + logger.debug(f"[{sandbox_id}] Uploaded config template to {target_path}") elapsed_step = time.time() - step_start self._log_step( - "SWE-agent repository installed", - step_name="SWE-agent Install", + "Configuration template uploaded", + step_name="Upload Config", is_complete=True, elapsed=elapsed_step, ) @contextmanager - def _config_template_context(self, problem_statement: str, project_path: str, instance_id: str): - """Context manager for temporary config file generation and cleanup. - - Args: - problem_statement: The problem statement for the task - project_path: Path to the target project - instance_id: The instance identifier for the run - - Yields: - Path to the temporary config file - """ - # Create a copy to avoid modifying the original + def _generated_config_template_context(self): + """Create a local temporary YAML config (template) for SWE-agent.""" new_config = copy.deepcopy(self.config.default_run_single_config) - # Set output directory - new_config["output_dir"] = f"/tmp_sweagent/{instance_id}" + # output_dir uses instance_id from config + if self.config.instance_id: + new_config["output_dir"] = f"{self.config.agent_installed_dir}/{self.config.instance_id}" + else: + new_config["output_dir"] = f"{self.config.agent_installed_dir}/generated_output" - # Update project path + # repo/project path uses project_path from config + project_path = self.config.workdir if "env" in new_config and "repo" in new_config["env"]: is_root_level = os.path.dirname(project_path) == "/" - if is_root_level: repo_name = os.path.basename(project_path) new_config["env"]["repo"]["repo_name"] = repo_name @@ -326,28 +255,24 @@ def _config_template_context(self, problem_statement: str, project_path: str, in else: new_config["env"]["repo"]["path"] = project_path new_config["env"]["repo"]["type"] = "local" - # base_commit is set using default value in template - # Update problem statement + # problem_statement will be injected at runtime; keep empty here if "problem_statement" in new_config: - new_config["problem_statement"]["text"] = problem_statement - new_config["problem_statement"]["id"] = instance_id + new_config["problem_statement"]["text"] = "" + new_config["problem_statement"]["id"] = self.config.instance_id - # Create a temporary config file temp_config_file = tempfile.NamedTemporaryFile( mode="w", - suffix=f"_{instance_id}_generated_config.yaml", + suffix="_generated_config.yaml", delete=False, encoding="utf-8", ) - temp_file_path = temp_config_file.name + try: yaml.dump(new_config, temp_config_file, default_flow_style=False, allow_unicode=True) temp_config_file.close() yield temp_file_path - except Exception as e: - raise e finally: try: os.unlink(temp_file_path) @@ -355,118 +280,13 @@ def _config_template_context(self, problem_statement: str, project_path: str, in except OSError as e: logger.warning(f"Failed to clean up temporary config file {temp_file_path}: {str(e)}") - async def run( - self, - problem_statement: str, - project_path: str, - instance_id: str, - agent_run_timeout: int = 1800, - agent_run_check_interval: int = 30, - ) -> Observation: - """Execute SWE-agent with the specified problem statement and project path. - - This method generates a configuration file from the default template, - uploads it to the sandbox and executes SWE-agent. If ModelService is configured, - it will be started and watch_agent will be called to monitor the agent process. - - Args: - problem_statement: The problem statement for the task - project_path: Path to the target project - instance_id: The instance identifier for the run - agent_run_timeout: Maximum seconds to wait for agent execution (default: 1800) - agent_run_check_interval: Seconds between status checks (default: 30) - - Returns: - Observation: Execution result containing exit code, stdout, and stderr - - Raises: - Exception: If agent execution fails - """ - sandbox_id = self._sandbox.sandbox_id - start_time = time.time() - - logger.info(f"[{sandbox_id}] Starting SWE-agent run operation") - logger.debug( - f"[{sandbox_id}] Project path: {project_path}, Instance ID: {instance_id}, " - f"Problem statement: {problem_statement[:100]}..." - ) - - try: - with self._config_template_context(problem_statement, project_path, instance_id) as generated_config_path: - config_filename = Path(generated_config_path).name - - # Upload configuration file - step_start = time.time() - target_path = f"{self.config.swe_agent_workdir}/{config_filename}" - logger.debug( - f"[{sandbox_id}] UploadRequest(source_path={os.path.abspath(generated_config_path)}, " - f"target_path={target_path})" - ) - - self._log_step("Uploading configuration file", step_name="Upload Config") - - await self._sandbox.upload( - UploadRequest( - source_path=os.path.abspath(generated_config_path), - target_path=target_path, - ) - ) - elapsed_step = time.time() - step_start - self._log_step( - "Configuration file uploaded", - step_name="Upload Config", - is_complete=True, - elapsed=elapsed_step, - ) - - # Execute SWE-agent - step_start = time.time() - self._log_step( - f"Running SWE-agent with timeout {agent_run_timeout}s", - step_name="SWE-agent Run", - ) - - swe_agent_run_cmd = ( - f"cd {self.config.swe_agent_workdir} && " - f"{self.config.swe_agent_workdir}/python/bin/sweagent run --config {config_filename}" - ) - full_cmd = f"bash -c {shlex.quote(swe_agent_run_cmd)}" - logger.debug( - f"[{sandbox_id}] Command: {full_cmd}\n" - f"Timeout: {agent_run_timeout}s, Check interval: {agent_run_check_interval}s" - ) - - result = await self._agent_run( - cmd=full_cmd, - session=self.agent_session, - wait_timeout=agent_run_timeout, - wait_interval=agent_run_check_interval, - ) - elapsed_step = time.time() - step_start - self._log_step( - "SWE-agent execution completed", - step_name="SWE-agent Run", - is_complete=True, - elapsed=elapsed_step, - ) - - elapsed_total = time.time() - start_time - - if result and result.exit_code == 0: - logger.info( - f"[{sandbox_id}] ✓ SWE-agent completed successfully " - f"(exit_code: {result.exit_code}, elapsed: {elapsed_total:.2f}s)" - ) - else: - error_msg = result.failure_reason if result else "No result returned" - logger.error(f"[{sandbox_id}] ✗ SWE-agent failed - {error_msg} (elapsed: {elapsed_total:.2f}s)") + @override + async def create_agent_run_cmd(self, prompt: str) -> str: + if not self.agent_runtime_env.prepared: + raise RuntimeError("Python runtime env is not prepared. Ensure agent.init() completed successfully.") - return result + swe_agent = f"{self.agent_runtime_env.bin_dir}/sweagent" + config_path = f"{self.config.agent_installed_dir}/{self.GENERATED_CONFIG_NAME}" - except Exception as e: - elapsed_total = time.time() - start_time - logger.error( - f"[{sandbox_id}] SWE-agent execution failed - {str(e)} (elapsed: {elapsed_total:.2f}s)", - exc_info=True, - ) - raise + cmd = f"{swe_agent} run --config {config_path} --problem_statement.text {shlex.quote(prompt)}" + return cmd diff --git a/rock/sdk/sandbox/client.py b/rock/sdk/sandbox/client.py index 27844cf4..b30e8b0a 100644 --- a/rock/sdk/sandbox/client.py +++ b/rock/sdk/sandbox/client.py @@ -14,7 +14,6 @@ from typing_extensions import deprecated from rock import env_vars -import rock from rock.actions import ( AbstractSandbox, Action, @@ -32,13 +31,13 @@ OssSetupResponse, ReadFileRequest, ReadFileResponse, + SandboxResponse, SandboxStatusResponse, UploadRequest, UploadResponse, WriteFileRequest, WriteFileResponse, ) -from rock.actions import SandboxResponse from rock.sdk.common.constants import PID_PREFIX, PID_SUFFIX, RunModeType from rock.sdk.common.exceptions import InternalServerRockError, InvalidParameterRockException, raise_for_code from rock.sdk.sandbox.agent.base import Agent diff --git a/rock/sdk/sandbox/model_service/base.py b/rock/sdk/sandbox/model_service/base.py index 5ca8499a..ad96a46c 100644 --- a/rock/sdk/sandbox/model_service/base.py +++ b/rock/sdk/sandbox/model_service/base.py @@ -99,6 +99,8 @@ async def install(self) -> None: Raises: Exception: If any installation step fails. """ + from rock.sdk.sandbox.client import RunMode + sandbox_id = self._sandbox.sandbox_id install_start_time = time.time() @@ -152,7 +154,7 @@ async def install(self) -> None: sandbox=self._sandbox, cmd=bash_python_cmd, session=self.config.model_service_session, - mode="nohup", + mode=RunMode.NOHUP, wait_timeout=self.config.python_install_timeout, error_msg="Python installation failed", ) @@ -174,7 +176,7 @@ async def install(self) -> None: sandbox=self._sandbox, cmd=bash_service_cmd, session=self.config.model_service_session, - mode="nohup", + mode=RunMode.NOHUP, wait_timeout=self.config.model_service_install_timeout, error_msg="Model service installation failed", ) diff --git a/rock/sdk/sandbox/process.py b/rock/sdk/sandbox/process.py index 11fc5b30..8dd2cc8a 100644 --- a/rock/sdk/sandbox/process.py +++ b/rock/sdk/sandbox/process.py @@ -1,9 +1,13 @@ from __future__ import annotations # Postpone annotation evaluation to avoid circular imports. +import shlex +import tarfile +import tempfile import time +from pathlib import Path from typing import TYPE_CHECKING -from rock.actions import Command, Observation +from rock.actions import Command, CreateBashSessionRequest, Observation from rock.logger import init_logger if TYPE_CHECKING: @@ -104,3 +108,98 @@ async def execute_script( logger.debug(f"Script cleaned up successfully: {script_path}") except Exception as e: logger.warning(f"Failed to cleanup script {script_path}: {e}") + + async def upload_dir( + self, + source_dir: str | Path, + target_dir: str, + extract_timeout: int = 600, + ) -> Observation: + """Upload local directory to sandbox using tar.gz (simple version). + + - Create a random bash session internally + - Check 'tar' exists; if not, return Observation with exit_code != 0 + - Pack source_dir fully into a tar.gz locally + - Upload to sandbox /tmp + - Extract into target_dir + - Always cleanup local tar.gz + + Returns: + Observation(exit_code=0) on success, otherwise exit_code!=0 with failure_reason. + """ + local_tar_path: Path | None = None + remote_tar_path: str | None = None + session: str | None = None + + try: + src = Path(source_dir).expanduser().resolve() + if not src.exists(): + return Observation(exit_code=1, failure_reason=f"source_dir not found: {src}") + if not src.is_dir(): + return Observation(exit_code=1, failure_reason=f"source_dir must be a directory: {src}") + if not isinstance(target_dir, str) or not target_dir.startswith("/"): + return Observation(exit_code=1, failure_reason=f"target_dir must be absolute path: {target_dir}") + + ts = str(time.time_ns()) + local_tar_path = Path(tempfile.gettempdir()) / f"rock_upload_{ts}.tar.gz" + remote_tar_path = f"/tmp/rock_upload_{ts}.tar.gz" + session = f"bash-{ts}" + + # create bash session + await self.sandbox.create_session(CreateBashSessionRequest(session=session)) + + # check tar exists + check = await self.sandbox.arun( + cmd="command -v tar >/dev/null 2>&1", + session=session, + ) + if check.exit_code != 0: + return Observation(exit_code=1, failure_reason="sandbox has no tar command; cannot extract tarball") + + # pack locally + try: + with tarfile.open(local_tar_path, "w:gz") as tf: + tf.add(str(src), arcname=".") + except Exception as e: + raise Exception(f"tar pack failed: {e}") + + # upload tarball + upload_response = await self.sandbox.upload_by_path( + file_path=str(local_tar_path), target_path=remote_tar_path + ) + if not upload_response.success: + return Observation(exit_code=1, failure_reason=f"tar upload failed: {upload_response.message}") + + # extract + extract_cmd = ( + f"rm -rf {shlex.quote(target_dir)} && mkdir -p {shlex.quote(target_dir)} && " + f"tar -xzf {shlex.quote(remote_tar_path)} -C {shlex.quote(target_dir)}" + ) + from rock.sdk.sandbox.client import RunMode + + res = await self.sandbox.arun( + cmd=f"bash -c {shlex.quote(extract_cmd)}", + mode=RunMode.NOHUP, + wait_timeout=extract_timeout, + ) + if res.exit_code != 0: + return Observation(exit_code=1, failure_reason=f"tar extract failed: {res.output}") + + # cleanup remote tarball + try: + await self.sandbox.execute(Command(command=["rm", "-f", remote_tar_path])) + except Exception: + pass + + return Observation(exit_code=0, output=f"uploaded {src} -> {target_dir} via tar") + + except Exception as e: + return Observation(exit_code=1, failure_reason=f"upload_dir unexpected error: {e}") + + finally: + # cleanup local tarball + try: + if local_tar_path and local_tar_path.exists(): + local_tar_path.unlink() + except Exception: + pass diff --git a/rock/sdk/sandbox/utils.py b/rock/sdk/sandbox/utils.py index 45b7ed29..3c44e885 100644 --- a/rock/sdk/sandbox/utils.py +++ b/rock/sdk/sandbox/utils.py @@ -5,7 +5,7 @@ from rock.utils import retry_async if TYPE_CHECKING: - from rock.sdk.sandbox.client import Sandbox + from rock.sdk.sandbox.client import RunModeType, Sandbox @retry_async(max_attempts=3, delay_seconds=5.0, backoff=2.0) @@ -13,7 +13,7 @@ async def arun_with_retry( sandbox: Sandbox, cmd: str, session: str, - mode: str = "nohup", + mode: RunModeType, wait_timeout: int = 300, wait_interval: int = 10, error_msg: str = "Command failed", diff --git a/tests/integration/sdk/sandbox/agent/swe_agent/test_init.py b/tests/integration/sdk/sandbox/agent/swe_agent/test_init.py index ec765d31..02feeaa7 100644 --- a/tests/integration/sdk/sandbox/agent/swe_agent/test_init.py +++ b/tests/integration/sdk/sandbox/agent/swe_agent/test_init.py @@ -47,14 +47,14 @@ async def test_swe_agent_initialization(sandbox_instance: Sandbox): await sandbox_instance.agent.init() # 3. Verify agent directory exists in root - agent_dir_name = os.path.basename(swe_agent_config.swe_agent_workdir) + agent_dir_name = os.path.basename(swe_agent_config.agent_installed_dir) await _verify_exists(sandbox_instance, "/", {agent_dir_name}) # 4. Verify agent installation directories - await _verify_exists(sandbox_instance, swe_agent_config.swe_agent_workdir, {"python", "SWE-agent"}) + await _verify_exists(sandbox_instance, swe_agent_config.agent_installed_dir, {"python", "SWE-agent"}) # 5. Verify Python executables - python_bin_path = f"{swe_agent_config.swe_agent_workdir}/python/bin" + python_bin_path = f"{swe_agent_config.agent_installed_dir}/python/bin" await _verify_exists(sandbox_instance, python_bin_path, {"sweagent"}) @@ -80,13 +80,13 @@ async def test_swe_agent_with_model_service(sandbox_instance: Sandbox): await sandbox_instance.agent.init() # 3. Verify both agent and model service directories exist in root - agent_dir_name = os.path.basename(swe_agent_config.swe_agent_workdir) + agent_dir_name = os.path.basename(swe_agent_config.agent_installed_dir) model_service_dir_name = os.path.basename(model_service_config.workdir) await _verify_exists(sandbox_instance, "/", {agent_dir_name, model_service_dir_name}) # 4. Verify agent installation directories - await _verify_exists(sandbox_instance, swe_agent_config.swe_agent_workdir, {"python", "SWE-agent"}) + await _verify_exists(sandbox_instance, swe_agent_config.agent_installed_dir, {"python", "SWE-agent"}) # 5. Verify Python executables - python_bin_path = f"{swe_agent_config.swe_agent_workdir}/python/bin" + python_bin_path = f"{swe_agent_config.agent_installed_dir}/python/bin" await _verify_exists(sandbox_instance, python_bin_path, {"sweagent"}) diff --git a/tests/integration/sdk/sandbox/agent/swe_agent/test_run.py b/tests/integration/sdk/sandbox/agent/swe_agent/test_run.py index 55741e58..ce1a5793 100644 --- a/tests/integration/sdk/sandbox/agent/swe_agent/test_run.py +++ b/tests/integration/sdk/sandbox/agent/swe_agent/test_run.py @@ -132,6 +132,8 @@ async def test_swe_agent_run(sandbox_instance: Sandbox) -> None: default_run_single_config=run_single_config, model_service_config=model_service_config, python_install_cmd=python_install_cmd, + workdir=project_path, + instance_id=test_instance_id, ) # Initialize and setup the agent @@ -151,15 +153,7 @@ async def test_swe_agent_run(sandbox_instance: Sandbox) -> None: inference_gen = call_model_inference_generator() # Run agent and model service in parallel - agent_run_task = asyncio.create_task( - sandbox_instance.agent.run( - problem_statement="rename 1.txt to 2.txt", - project_path=project_path, - instance_id=test_instance_id, - agent_run_timeout=1800, - agent_run_check_interval=30, - ) - ) + agent_run_task = asyncio.create_task(sandbox_instance.agent.run(prompt="rename 1.txt to 2.txt")) whale_service_task = asyncio.create_task(model_service_loop(sandbox_instance.agent, inference_gen)) @@ -171,7 +165,7 @@ async def test_swe_agent_run(sandbox_instance: Sandbox) -> None: logger.error(f"Task {i} failed: {type(result).__name__}: {str(result)}") patch_path = ( - f"{swe_agent_config.swe_agent_workdir}/{test_instance_id}/{test_instance_id}/{test_instance_id}.patch" + f"{swe_agent_config.agent_installed_dir}/{test_instance_id}/{test_instance_id}/{test_instance_id}.patch" ) file_content = await sandbox_instance.arun(cmd=f"cat {patch_path}", mode=RunMode.NOHUP) diff --git a/tests/integration/sdk/sandbox/test_process.py b/tests/integration/sdk/sandbox/test_process.py new file mode 100644 index 00000000..53af8d89 --- /dev/null +++ b/tests/integration/sdk/sandbox/test_process.py @@ -0,0 +1,98 @@ +"""Tests for Process.upload_dir (tar-based directory upload) using one sandbox.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +from rock.actions import Command +from rock.logger import init_logger +from rock.sdk.sandbox.client import Sandbox +from tests.integration.conftest import SKIP_IF_NO_DOCKER + +logger = init_logger(__name__) + + +def _make_local_fixture_dir(tmp_path: Path) -> Path: + root = tmp_path / "upload_dir_fixture" + root.mkdir(parents=True, exist_ok=True) + + (root / "a.txt").write_text("hello-a\n", encoding="utf-8") + + sub = root / "sub" + sub.mkdir(parents=True, exist_ok=True) + (sub / "b.txt").write_text("hello-b\n", encoding="utf-8") + + nested = root / "sub2" / "nested" + nested.mkdir(parents=True, exist_ok=True) + (nested / "c.txt").write_text("hello-c\n", encoding="utf-8") + + return root + + +async def _assert_upload_dir_success(sandbox: Sandbox, tmp_path: Path): + local_dir = _make_local_fixture_dir(tmp_path) + target_dir = "/tmp/test_process_upload_dir_success" + + obs = await sandbox.process.upload_dir(source_dir=local_dir, target_dir=target_dir) + assert obs.exit_code == 0, f"upload_dir failed: {obs.failure_reason or obs.output}" + + res = await sandbox.execute(Command(command=["bash", "-lc", f"cat {target_dir}/a.txt"])) + assert res.exit_code == 0 + assert res.stdout == "hello-a\n" + + res = await sandbox.execute(Command(command=["bash", "-lc", f"cat {target_dir}/sub/b.txt"])) + assert res.exit_code == 0 + assert res.stdout == "hello-b\n" + + res = await sandbox.execute(Command(command=["bash", "-lc", f"cat {target_dir}/sub2/nested/c.txt"])) + assert res.exit_code == 0 + assert res.stdout == "hello-c\n" + + +async def _assert_upload_dir_invalid_source(sandbox: Sandbox, tmp_path: Path): + missing_dir = tmp_path / "missing" + target_dir = "/tmp/test_process_upload_dir_invalid_source" + + obs = await sandbox.process.upload_dir(source_dir=missing_dir, target_dir=target_dir) + assert obs.exit_code != 0 + assert obs.failure_reason + + +async def _assert_upload_dir_invalid_target(sandbox: Sandbox, tmp_path: Path): + local_dir = _make_local_fixture_dir(tmp_path) + obs = await sandbox.process.upload_dir(source_dir=local_dir, target_dir="relative/path/not/allowed") + assert obs.exit_code != 0 + assert "absolute" in (obs.failure_reason or "").lower() + + +async def _assert_upload_dir_overwrite_existing(sandbox: Sandbox, tmp_path: Path): + local_dir = _make_local_fixture_dir(tmp_path) + target_dir = "/tmp/test_process_upload_dir_overwrite_existing" + + r = await sandbox.execute( + Command(command=["bash", "-lc", f"mkdir -p {target_dir} && echo junk > {target_dir}/junk.txt"]) + ) + assert r.exit_code == 0 + + obs = await sandbox.process.upload_dir(source_dir=local_dir, target_dir=target_dir) + assert obs.exit_code == 0, f"upload_dir failed: {obs.failure_reason or obs.output}" + + res = await sandbox.execute(Command(command=["bash", "-lc", f"test ! -f {target_dir}/junk.txt"])) + assert res.exit_code == 0 + + res = await sandbox.execute(Command(command=["bash", "-lc", f"cat {target_dir}/a.txt"])) + assert res.exit_code == 0 + assert res.stdout == "hello-a\n" + + +@pytest.mark.need_admin +@SKIP_IF_NO_DOCKER +@pytest.mark.asyncio +async def test_process_upload_dir_all_in_one(sandbox_instance: Sandbox, tmp_path: Path): + """Run all upload_dir checks in one sandbox.""" + await _assert_upload_dir_success(sandbox_instance, tmp_path) + await _assert_upload_dir_invalid_source(sandbox_instance, tmp_path) + await _assert_upload_dir_invalid_target(sandbox_instance, tmp_path) + await _assert_upload_dir_overwrite_existing(sandbox_instance, tmp_path)