Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions OS_images/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,16 +346,33 @@ def get_cursor():
pyautogui.moveTo(current_x, current_y)
# =====================================

cursor_obj = Xcursor()
imgarray = cursor_obj.getCursorImageArrayFast()
cursor_img = Image.fromarray(imgarray)

# Taking screenshot after the wake-up
screenshot = pyautogui.screenshot()

cursor_x, cursor_y = pyautogui.position()
screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img)
screenshot.save(file_path)
max_screenshot_attempts = 3
for _screenshot_attempt in range(max_screenshot_attempts):
try:
cursor_obj = Xcursor()
imgarray = cursor_obj.getCursorImageArrayFast()
cursor_img = Image.fromarray(imgarray)

# Taking screenshot after the wake-up
screenshot = pyautogui.screenshot()

cursor_x, cursor_y = pyautogui.position()
screenshot.paste(cursor_img, (cursor_x, cursor_y), cursor_img)
screenshot.save(file_path)
break # Success
except Exception as e:
logger.warning(f"Screenshot attempt {_screenshot_attempt + 1}/{max_screenshot_attempts} failed: {e}")
# Clean up stale temp files that may cause PIL errors
import glob
for tmp_png in glob.glob("/tmp/tmp*.png"):
try:
os.remove(tmp_png)
except OSError:
pass
if _screenshot_attempt == max_screenshot_attempts - 1:
logger.error(f"All {max_screenshot_attempts} screenshot attempts failed, returning error")
return jsonify({"status": "error", "message": f"Screenshot failed: {e}"}), 503
time.sleep(0.5)
elif user_platform == "Darwin": # (Mac OS)
# Use the screencapture utility to capture the screen with the cursor
subprocess.run(["screencapture", "-C", file_path])
Expand Down Expand Up @@ -3773,4 +3790,4 @@ def run_bash_script():
pass

if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0")
app.run(debug=False, host="0.0.0.0")
153 changes: 126 additions & 27 deletions cua/README.md
Original file line number Diff line number Diff line change
@@ -1,51 +1,150 @@
# CUA Data Collection

## Running on Interactive Session
### 1. Boot Up vLLM Servers
We use two vLLM servers - 1 for Qwen3-VL-235B (goal generation policy, aka `planner model` in the code) and 1 for
UI-TARS-1.5-7B (action generation policy, aka `actor model` in the code).
Runs a **planner** model (Qwen3-VL-235B, tp=8) on a non-reserved GPU node and one or more **actor** nodes (UI-TARS-1.5-7B, tp=4) on reserved GPU nodes with KVM-accelerated Linux VMs for data collection. All nodes use the combined `cua-vllm-0.13.0.sqsh` container image (vLLM/CUDA + QEMU/KVM).

**Why `enroot exec`?** The srun enroot container loses `/dev/kvm` write access. Running `enroot exec` from outside the container retains it. Actor nodes use the SSH+enroot holder-job pattern for this reason.

## Automated Full Run

Run both servers on 2 nodes by
```bash
cd scripts
sbatch run_model.sbatch
bash run.sh
```

The logs will be shown at `scripts/logs/planner.out` and `scripts/logs/actor.out`.
This:
1. Submits a planner sbatch job (Qwen3-VL-235B, 8 GPUs, non-reserved)
2. Waits for the planner to start and write its hostname to a coordination file
3. Launches `NUM_ACTORS` actor instances, each submitting its own holder job on a reserved node
4. Each actor starts UI-TARS-1.5-7B vLLM + data collection VMs inside its container
5. Waits for all actors to finish, then cancels the planner

### Environment Variables

| Variable | Default | Description |
|----------|---------|-------------|
| `NUM_ACTORS` | 1 | Number of actor nodes to launch |
| `MAX_PARALLEL` | 16 | Parallel VMs per actor node |
| `MAX_TRAJECTORIES` | 10000 | Trajectories to collect per actor |

### 2. Run CPU node for data collection, with /dev/kvm write permission
We now run CPU interactive session, in which we boot up the linux virtual machine (VM) and call the vLLM servers to
collect the trajectories. Here, it's important we have a write access to `/dev/kvm`, as it allows us to accelerate VM.
Each actor independently collects up to `MAX_TRAJECTORIES`. With `NUM_ACTORS=3, MAX_TRAJECTORIES=500`, you get up to 1500 total.

### Examples

```bash
# Single actor, defaults
bash run.sh

# 3 actors, 4 parallel VMs each, 500 trajectories each
NUM_ACTORS=3 MAX_PARALLEL=4 MAX_TRAJECTORIES=500 bash run.sh
```

### Logs

All logs go to `scripts/logs_multi_thread/`:

| File | Contents |
|------|----------|
| `planner-<jobid>.out` | Planner vLLM server output |
| `actor_launcher_<N>.log` | Actor N lifecycle (job submission, container polling, SSH exec) |
| `actor_<N>-<jobid>.out` | Actor N vLLM + data collection output |
| `vllm_actor_<N>.log` | Actor N vLLM server detailed logs |

### SLURM Configuration

| Component | Account | Partition | Reservation |
|-----------|---------|-----------|-------------|
| Planner | `nvr_lacr_llm` | `interactive` | none |
| Actor | `llmservice_fm_vision` | `interactive` | `sla_res_osworld_agent_vlm` |

## Interactive Debugging

### 1. Start the Planner vLLM Server

In a separate terminal, submit the planner on its own 8-GPU node:

```bash
IMAGE="/lustre/fsw/portfolios/nvr/users/bcui/images/cua-vllm-0.13.0.sqsh"
PLANNER_MODEL="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/models/Qwen3-VL-235B-A22B-Thinking"

srun --job-name=planner \
--account=nvr_lacr_llm \
--partition=interactive \
--gpus-per-node=8 \
--nodes=1 --ntasks-per-node=1 \
--time=04:00:00 --exclusive \
--container-image=$IMAGE \
--container-mounts=/lustre:/lustre \
bash -c "vllm serve $PLANNER_MODEL \
--api-key gen \
--tensor-parallel-size 8 \
--enable-expert-parallel \
--limit-mm-per-prompt.video 0 \
--limit-mm-per-prompt.image 3 \
--async-scheduling \
--max-model-len 65536 \
--gpu-memory-utilization 0.9"
```

Note which node it lands on via `squeue -u $USER` — you'll need this as `$PLANNER_NODE` later (e.g. `pool0-2838`).

### 2. Get an Interactive Shell with KVM

Run the bash script to spin up the CPU interactive node by
```bash
cd scripts
bash debug_interactive.sh
```

***What does `debug_interactive.sh` do?*** \
(1) We first allocate 1 CPU interactive node (with `sleep infinity &` as the command). We assign one of the reserved nodes
for `/dev/kvm` access. \
(2) When the node is ready with enroot container running, we ssh into the node, and fetch the enroot container ID. \
(3) We run `enroot exec $CONTAINER_ID bash` in order to access a bash shell inside that enroot container.
This allocates a GPU interactive node (8 GPUs) with the combined container image. The script:
1. Submits a background `sleep infinity` job to reserve the node
2. Waits for the enroot container to be ready
3. SSHs into the node and runs `enroot exec` to enter the container

When you exit the shell (`exit`), the script automatically cancels the SLURM job via a cleanup trap.

***Wait, why aren't we just directly using the CPU interactive node in step (1)?*** \
This is a very finicky detail, but in step (1), the enroot container environment loses `/dev/kvm` access that the
CPU node originally had. The only way to retain `/dev/kvm` access inside enroot is to first boot up the container,
then running `enroot exec $CONTAINER_ID bash` from outside.
### 3. Start the Actor vLLM Server

Inside the container shell, start UI-TARS-1.5-7B in the background:

### 3-A. Run Data Collection Script for Debugging
```bash
python debug_collect_trajectories.py --planner_node $PLANNER_NODE --actor_node $ACTOR_NODE
vllm serve ByteDance-Seed/UI-TARS-1.5-7B \
--api-key gen \
--tensor-parallel-size 4 \
--limit-mm-per-prompt.image 5 \
--limit-mm-per-prompt.video 0 \
--max-model-len 65536 &
```
`$PLANNER_NODE` and `$ACTOR_NODE` should be manually set by the user (e.g., pool0-2838).

### 3-B. Run Data Collection Script for Parallel Processing
Wait for it to be healthy:
```bash
python parallel_collect_trajectories.py --planner_node $PLANNER_NODE --actor_node $ACTOR_NODE
curl http://localhost:8000/health
```

## Running with SBATCH
TBD (we just need SBATCH script to run `parallel_collect_trajectories.py`)
### 4. Run Debug Data Collection

Still inside the container:

```bash
cd /lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server/cua
source cua_env_reqs/bin/activate
export PYTHONPATH=/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server:$PYTHONPATH

python debug_collect_trajectories.py \
--planner_node $PLANNER_NODE \
--actor_node localhost
```

`$PLANNER_NODE` is the node from step 1. The actor is `localhost` since it's running on the same node.

## Scripts Reference

| Script | Purpose |
|--------|---------|
| `run.sh` | Multi-actor launcher (1 planner + N actors) |
| `run_planner.sbatch` | Planner vLLM server sbatch job |
| `run_actor_and_vm.sh` | Single actor launcher (SSH+enroot pattern) |
| `run_all.sbatch` | Legacy consolidated 2-node sbatch (kept for reference) |
| `debug_interactive.sh` | Interactive GPU shell with KVM |
| `debug_check_kvm.sbatch` | Verify KVM works on GPU nodes (inside container) |
| `check_kvm_cpu.sbatch` | Verify KVM on CPU nodes (legacy) |
| `check_kvm_bash.sh` | Quick KVM write-permission test |
| `run_models.sbatch` | Start both model servers on 2 GPU nodes (standalone) |
112 changes: 112 additions & 0 deletions cua/cleanup_nvcf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!/usr/bin/env python3
"""List and clean up NVCF functions.

Usage:
# List all functions in the org:
python cleanup_nvcf.py --list

# Undeploy and delete only YOUR pool functions (nvcf-pool-*):
python cleanup_nvcf.py --cleanup

# Undeploy and delete ALL functions (careful — includes other users'):
python cleanup_nvcf.py --cleanup --all

# Undeploy and delete a specific function:
python cleanup_nvcf.py --delete FUNCTION_ID VERSION_ID
"""
import argparse
import os
import sys

sys.path.insert(0, "/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server")

from openhands.nvidia.os_world.nvcf import OSWorldDeployer

POOL_NAME_PREFIX = "nvcf-pool-"


def main():
parser = argparse.ArgumentParser(description="NVCF function cleanup utility")
parser.add_argument("--list", action="store_true", help="List all functions in the org")
parser.add_argument("--cleanup", action="store_true",
help="Undeploy and delete pool functions (nvcf-pool-* only, unless --all)")
parser.add_argument("--all", action="store_true",
help="With --cleanup: delete ALL functions, not just pool ones")
parser.add_argument("--delete", nargs=2, metavar=("FUNC_ID", "VER_ID"),
help="Delete a specific function")
args = parser.parse_args()

api_key = os.environ.get("NGC_API_KEY")
org = os.environ.get("NGC_ORG")
if not api_key or not org:
print("ERROR: Set NGC_API_KEY and NGC_ORG environment variables")
sys.exit(1)

deployer = OSWorldDeployer(api_key=api_key, org_name=org)

if args.list or (not args.cleanup and not args.delete):
print("Listing all private NVCF functions in org...\n")
result = deployer.list_functions()
functions = result.get("functions", [])
if not functions:
print("No functions found.")
return
for fn in functions:
fn_id = fn.get("id", "?")
name = fn.get("name", "?")
status = fn.get("status", "?")
ver_id = fn.get("versionId", "?")
mine = " <-- pool" if name.startswith(POOL_NAME_PREFIX) else ""
print(f" {name:40s} status={status:10s} fn={fn_id} ver={ver_id}{mine}")
print(f"\nTotal: {len(functions)} functions")

if args.delete:
fn_id, ver_id = args.delete
print(f"Undeploying {fn_id}...")
try:
deployer.undeploy(fn_id, ver_id, graceful=True)
print("Undeployed. Deleting...")
except Exception as e:
print(f"Undeploy failed (may already be undeployed): {e}")
try:
deployer.delete_function(fn_id, ver_id)
print("Deleted.")
except Exception as e:
print(f"Delete failed: {e}")

if args.cleanup:
result = deployer.list_functions()
functions = result.get("functions", [])

if not args.all:
# Only clean up pool functions
functions = [f for f in functions if f.get("name", "").startswith(POOL_NAME_PREFIX)]
print(f"Cleaning up {len(functions)} pool functions (nvcf-pool-*)...\n")
else:
print(f"Cleaning up ALL {len(functions)} functions...\n")

if not functions:
print("Nothing to clean up.")
return

for fn in functions:
fn_id = fn.get("id", "?")
ver_id = fn.get("versionId", "?")
name = fn.get("name", "?")
status = fn.get("status", "?")
print(f" Cleaning up: {name} ({fn_id}) status={status}")
try:
deployer.undeploy(fn_id, ver_id, graceful=True)
print(f" Undeployed")
except Exception as e:
print(f" Undeploy skipped: {e}")
try:
deployer.delete_function(fn_id, ver_id)
print(f" Deleted")
except Exception as e:
print(f" Delete failed: {e}")
print(f"\nDone. Cleaned up {len(functions)} functions.")


if __name__ == "__main__":
main()
Loading