From 7d455255b0d64d96a60849722ec89629845d2f1d Mon Sep 17 00:00:00 2001 From: vsoch Date: Fri, 1 May 2026 00:27:39 -0700 Subject: [PATCH 1/6] feat: add snakemake provider Signed-off-by: vsoch --- resource_secretary/providers/__init__.py | 12 +- resource_secretary/providers/provider.py | 14 +- .../providers/workflow/__init__.py | 1 + .../providers/workflow/snakemake.py | 898 ++++++++++++++++++ 4 files changed, 922 insertions(+), 3 deletions(-) create mode 100644 resource_secretary/providers/workflow/__init__.py create mode 100644 resource_secretary/providers/workflow/snakemake.py diff --git a/resource_secretary/providers/__init__.py b/resource_secretary/providers/__init__.py index 9ce9266..2fa4d89 100644 --- a/resource_secretary/providers/__init__.py +++ b/resource_secretary/providers/__init__.py @@ -44,7 +44,14 @@ def find_provider_classes(root_path=None): yield category, obj -def discover_providers(probe=True, path=None) -> Dict[str, List[BaseProvider]]: +def discover_catalogs(probe=True, path=None): + """ + Discover catalogs, user specific and asked for groups of tools. + """ + return discover_providers(probe=probe, path=path, is_catalog=True) + + +def discover_providers(probe=True, path=None, is_catalog=False) -> Dict[str, List[BaseProvider]]: """ Recursively discovers and probes providers in subdirectories. Returns a dictionary categorized by the subdirectory name. @@ -61,6 +68,9 @@ def discover_providers(probe=True, path=None) -> Dict[str, List[BaseProvider]]: # that might have accidentally left is_provider=True try: instance = cls() + # Loading catalogs or not + if instance.is_catalog != is_catalog: + continue instance.category = category if not probe or instance.probe(): if category not in catalog: diff --git a/resource_secretary/providers/provider.py b/resource_secretary/providers/provider.py index b2ea20d..d855a67 100644 --- a/resource_secretary/providers/provider.py +++ b/resource_secretary/providers/provider.py @@ -16,6 +16,7 @@ def wrapper(*args, **kwargs): # Attach categorical metadata wrapper.is_tool = True + # Right now this can be secretary or dispatch wrapper.tool_category = category @@ -52,7 +53,6 @@ def wrapper(*args, **kwargs): def secretary_tool(func: Callable): """ - Labels a method as a tool for discovery and status retrieval. Used by the secretary agent primarily. We don't want the secretary to make system changes, it's like read only. """ @@ -61,12 +61,18 @@ def secretary_tool(func: Callable): def dispatch_tool(func: Callable): """ - Labels a method as a tool for actions and state changes. Used by the dispatcher agent to actually interact with a system (submit, cancel, etc). """ return _base_tool_decorator(func, "dispatch") +def workflow_tool(func: Callable): + """ + Used by the workflow agent to execute workflow steps. + """ + return _base_tool_decorator(func, "workflow") + + class BaseProvider: """ BaseProvider is that - a base provider class. Note that we set is_provider by default @@ -78,6 +84,10 @@ class BaseProvider: is_provider = True + # A catalog needs to be requested on demand, usually because + # it requires additional work at probe (e.g., snakemake wrappers) + is_catalog = False + def __init__(self, *args, **kwargs): self.tools: Dict[str, Callable] = {} self.category: str = "unknown" diff --git a/resource_secretary/providers/workflow/__init__.py b/resource_secretary/providers/workflow/__init__.py new file mode 100644 index 0000000..43ed7d9 --- /dev/null +++ b/resource_secretary/providers/workflow/__init__.py @@ -0,0 +1 @@ +from .snakemake import SnakemakeProvider diff --git a/resource_secretary/providers/workflow/snakemake.py b/resource_secretary/providers/workflow/snakemake.py new file mode 100644 index 0000000..fb9fdc1 --- /dev/null +++ b/resource_secretary/providers/workflow/snakemake.py @@ -0,0 +1,898 @@ +import os +import shutil +import subprocess +import tempfile +from pathlib import Path +from typing import Any, Dict, List, Optional + +import yaml + +from ..provider import BaseProvider, workflow_tool + +# Module-level environment configuration, read at import time +SNAKEMAKE_WORK_DIR = os.environ.get("RESOURCE_SECRETARY_SNAKEMAKE_WORKDIR") +SNAKEMAKE_INPUT_DIR = os.environ.get("RESOURCE_SECRETARY_SNAKEMAKE_INPUT") +SNAKEMAKE_WRAPPER_VERSION = os.environ.get("RESOURCE_SECRETARY_SNAKEMAKE_WRAPPER_VERSION", "master") +SNAKEMAKE_WRAPPER_CLONE = os.environ.get("RESOURCE_SECRETARY_SNAKEMAKE_WRAPPER_CLONE") + +print(f" SNAKEMAKE_WORK_DIR: {SNAKEMAKE_WORK_DIR}") +print(f" SNAKEMAKE_INPUT_DIR: {SNAKEMAKE_INPUT_DIR}") +print(f" SNAKEMAKE_WRAPPER_VERSION: {SNAKEMAKE_WRAPPER_VERSION}") + + +def _validate_path(path: str, mode: str = "read") -> tuple[Optional[Path], Optional[str]]: + """ + Resolves a path and enforces sandbox boundaries. + Read access: anywhere under WORK_DIR (covers both input/ and steps/). + Write access: only under WORK_DIR/steps/. + Returns (Path, None) on success or (None, error_string) on failure. + """ + if not SNAKEMAKE_WORK_DIR: + return None, "RESOURCE_SECRETARY_SNAKEMAKE_WORKDIR is not set." + + try: + p = Path(path).resolve() + work_p = Path(SNAKEMAKE_WORK_DIR).resolve() + + if mode == "write": + steps_p = work_p / "steps" + if not p.is_relative_to(steps_p): + return None, ( + f"Security Error: Write access denied. " + f"Path '{path}' is outside WORK_DIR/steps/." + ) + return p, None + + if mode == "read": + if p.is_relative_to(work_p): + return p, None + return None, ( + f"Security Error: Read access denied. " f"Path '{path}' is outside WORK_DIR." + ) + + except Exception as e: + return None, f"Path resolution error: {e}" + + return None, "Unknown path error." + + +def _dir_listing(root: Path) -> List[Dict[str, Any]]: + """Returns a recursive sorted listing of a directory.""" + items = [] + for p in sorted(root.rglob("*")): + items.append( + { + "path": str(p.relative_to(root)), + "type": "dir" if p.is_dir() else "file", + "size_bytes": p.stat().st_size if p.is_file() else 0, + } + ) + return items + + +class SnakemakeProvider(BaseProvider): + """ + Workflow catalog provider that exposes snakemake-wrappers as executable + tools for an agent. On probe, clones the wrappers repository, builds a + full metadata index, and stages input data into WORK_DIR/input/ via + symlinks. Provides search, inspection, step-by-step execution, and + rollback tools. Must be explicitly requested as a catalog because probe + performs a git clone and filesystem staging. + """ + + is_catalog = True + + def __init__(self, repo_url: str = "https://github.com/snakemake/snakemake-wrappers"): + super().__init__() + self.repo_url = repo_url + self.repo_path: Optional[Path] = None + self.available: bool = False + self._index: Dict[str, Dict[str, Any]] = {} + self._conda_frontend: Optional[str] = None + + # Execution state + self._history = [] + # Each entry: {rule_name, step_dir (relative to WORK_DIR), snakefile_bytes} + + @property + def name(self) -> str: + return "snakemake" + + @property + def metadata(self) -> Dict[str, Any]: + return { + "name": self.name, + "wrapper_count": len(self._index), + "repo_path": str(self.repo_path), + "wrapper_version": SNAKEMAKE_WRAPPER_VERSION, + "work_dir": SNAKEMAKE_WORK_DIR, + "input_dir": SNAKEMAKE_INPUT_DIR, + "available": self.available, + "steps_completed": len(self._history), + } + + def probe(self) -> bool: + """ + Ensures snakemake and git are installed, clones the wrappers repository + to a session-scoped temp directory, builds the wrapper index, and stages + input data into WORK_DIR/input/ via symlinks. + """ + if not shutil.which("snakemake"): + raise ValueError(" SnakemakeProvider: snakemake not found in PATH.") + if not shutil.which("git"): + raise ValueError(" SnakemakeProvider: git not found in PATH.") + if not SNAKEMAKE_WORK_DIR: + raise ValueError( + " SnakemakeProvider: RESOURCE_SECRETARY_SNAKEMAKE_WORKDIR is not set." + ) + if not shutil.which("mamba") and not shutil.which("conda"): + raise ValueError( + " SnakemakeProvider: neither mamba nor conda found in PATH. One is required for --use-conda wrapper execution." + ) + self._check_wrapper_utils() + self._conda_frontend = "mamba" if shutil.which("mamba") else "conda" + + if SNAKEMAKE_WRAPPER_CLONE is not None and os.path.exists(SNAKEMAKE_WRAPPER_CLONE): + self.repo_url = SNAKEMAKE_WRAPPER_CLONE + else: + self.repo_path = self.clone_snakemake() + + self._build_index() + self._setup_work_dir() + self.available = True + return self.available + + def clone_snakemake(self): + """ + Clone snakemake to a temporary directory + """ + tmp_dir = Path(tempfile.gettempdir()) / "snakemake_wrappers_catalog" + if not tmp_dir.exists(): + print(f" SnakemakeProvider: cloning wrappers to {tmp_dir} ...") + subprocess.run( + ["git", "clone", "--depth", "1", self.repo_url, str(tmp_dir)], + check=True, + capture_output=True, + ) + else: + print(f" SnakemakeProvider: wrappers repo exists at {tmp_dir}, pulling latest.") + subprocess.run( + ["git", "-C", str(tmp_dir), "pull"], + capture_output=True, + ) + return tmp_dir + + def _check_wrapper_utils(self) -> bool: + """ + Ensures snakemake-wrapper-utils is installed, installing it if not. + Returns True if available after check, False if install failed. + """ + try: + import snakemake_wrapper_utils + except ImportError: + raise ValueError( + " SnakemakeProvider: snakemake-wrapper-utils not found. Install first." + ) + + def _setup_work_dir(self): + """ + Creates the canonical WORK_DIR structure and symlinks INPUT_DIR + contents into WORK_DIR/input/, preserving directory structure. + """ + work_p = Path(SNAKEMAKE_WORK_DIR) + input_stage = work_p / "input" + steps_dir = work_p / "steps" + logs_dir = work_p / "logs" + + for d in (work_p, input_stage, steps_dir, logs_dir): + d.mkdir(parents=True, exist_ok=True) + + # Symlink INPUT_DIR contents into input/ preserving structure + if SNAKEMAKE_INPUT_DIR: + input_src = Path(SNAKEMAKE_INPUT_DIR) + for src in sorted(input_src.rglob("*")): + rel = src.relative_to(input_src) + dest = input_stage / rel + if src.is_dir(): + dest.mkdir(parents=True, exist_ok=True) + elif src.is_file() and not dest.exists(): + dest.parent.mkdir(parents=True, exist_ok=True) + dest.symlink_to(src.resolve()) + + print(f" SnakemakeProvider: work dir staged at {work_p}") + + def _build_index(self): + """ + Walks the cloned wrappers repo and indexes every wrapper found via + meta.yaml. Collects name, description, authors, conda packages, + README, and test Snakefile example. Both regular wrappers and + meta-wrappers are indexed; type field distinguishes them. + """ + self._index = {} + for meta_path in self.repo_path.rglob("meta.yaml"): + wrapper_dir = meta_path.parent + wrapper_path = str(wrapper_dir.relative_to(self.repo_path)) + + try: + with open(meta_path) as f: + meta = yaml.safe_load(f) or {} + except Exception: + continue + + conda_packages = [] + env_path = wrapper_dir / "environment.yaml" + if env_path.exists(): + try: + with open(env_path) as f: + env = yaml.safe_load(f) or {} + conda_packages = env.get("dependencies", []) + except Exception: + pass + + readme = "" + for readme_name in ("README.md", "readme.md"): + readme_path = wrapper_dir / readme_name + if readme_path.exists(): + try: + readme = readme_path.read_text() + except Exception: + pass + break + + example_rule = "" + test_snakefile = wrapper_dir / "test" / "Snakefile" + if test_snakefile.exists(): + try: + example_rule = test_snakefile.read_text() + except Exception: + pass + + category = wrapper_path.split("/")[0] + wrapper_type = "meta_wrapper" if wrapper_path.startswith("meta/") else "wrapper" + + self._index[wrapper_path] = { + "path": wrapper_path, + "category": category, + "type": wrapper_type, + "name": meta.get("name", wrapper_path), + "description": meta.get("description", ""), + "authors": meta.get("authors", []), + "conda_packages": conda_packages, + "readme": readme, + "example_rule": example_rule, + } + + print(f" SnakemakeProvider: indexed {len(self._index)} wrappers.") + + @property + def _snakefile_path(self) -> Path: + return Path(SNAKEMAKE_WORK_DIR) / "Snakefile" + + @property + def _steps_dir(self) -> Path: + return Path(SNAKEMAKE_WORK_DIR) / "steps" + + def _next_step_dir(self, rule_name: str) -> Path: + """Returns the next numbered step directory path (not yet created).""" + n = len(self._history) + 1 + return self._steps_dir / f"{n:02d}_{rule_name}" + + def _append_rule(self, rule_text: str): + """Appends a rule block to the accumulating Snakefile.""" + with open(self._snakefile_path, "a") as f: + f.write("\n\n") + f.write(rule_text) + + def _snakefile_size(self) -> int: + """Returns current Snakefile byte length, 0 if not yet created.""" + if self._snakefile_path.exists(): + return self._snakefile_path.stat().st_size + return 0 + + def _run_snakemake(self, targets: List[str]) -> subprocess.CompletedProcess: + return subprocess.run( + [ + "snakemake", + "--use-conda", + "--conda-frontend", + self._conda_frontend, + "--cores", + "1", + "--snakefile", + str(self._snakefile_path), + ] + + targets, + capture_output=True, + text=True, + cwd=SNAKEMAKE_WORK_DIR, + ) + + def _resolve_input_paths(self, input: Dict[str, Any]) -> Dict[str, str]: + """ + Resolves input paths relative to WORK_DIR. + Paths starting with 'steps/' are resolved relative to WORK_DIR. + All other paths are assumed relative to WORK_DIR/input/. + Supports list values for inputs that expect multiple files (e.g. index sets). + """ + resolved = {} + work_p = Path(SNAKEMAKE_WORK_DIR) + for k, v in input.items(): + if isinstance(v, list): + resolved_list = [] + for item in v: + item = str(item) + if item.startswith("steps/"): + resolved_list.append(str(work_p / item)) + else: + resolved_list.append(str(work_p / "input" / item)) + resolved[k] = resolved_list + else: + v = str(v) + if v.startswith("steps/"): + resolved[k] = str(work_p / v) + else: + resolved[k] = str(work_p / "input" / v) + return resolved + + def _resolve_output_paths(self, output: Dict[str, Any], step_dir: Path) -> Dict[str, str]: + """ + Resolves output paths relative to the step's output directory. + Supports list values for outputs that produce multiple files. + """ + resolved = {} + for k, v in output.items(): + if isinstance(v, list): + resolved[k] = [str(step_dir / str(item)) for item in v] + else: + resolved[k] = str(step_dir / str(v)) + return resolved + + def _build_rule_lines( + self, + rule_name: str, + input: Dict[str, Any], + output: Dict[str, Any], + params: Optional[Dict[str, Any]], + threads: int, + log: Optional[str], + step_dir: Path, + ) -> tuple[List[str], Dict[str, str]]: + """ + Builds the shared input/output/params/threads/log portion of a rule + with all paths fully resolved. Returns (lines, resolved_output). + Supports list values for inputs and outputs that expect multiple files. + """ + resolved_input = self._resolve_input_paths(input) + resolved_output = self._resolve_output_paths(output, step_dir) + + lines = [f"rule {rule_name}:"] + + lines.append(" input:") + for k, v in resolved_input.items(): + if isinstance(v, list): + items = ", ".join(f'"{item}"' for item in v) + lines.append(f" {k}=[{items}],") + else: + lines.append(f' {k}="{v}",') + + lines.append(" output:") + for k, v in resolved_output.items(): + if isinstance(v, list): + items = ", ".join(f'"{item}"' for item in v) + lines.append(f" {k}=[{items}],") + else: + lines.append(f' {k}="{v}",') + + if params: + lines.append(" params:") + for k, v in params.items(): + val = f'"{v}"' if isinstance(v, str) else str(v) + lines.append(f" {k}={val},") + + if log: + log_path = str(Path(SNAKEMAKE_WORK_DIR) / "logs" / f"{rule_name}.log") + lines.append(f' log: "{log_path}"') + + lines.append(f" threads: {threads}") + return lines, resolved_output + + def _execute( + self, + rule_name: str, + rule_text: str, + resolved_output: Dict[str, str], + step_dir: Path, + ) -> Dict[str, Any]: + """ + Shared execution path for both execute_wrapper and execute_rule. + """ + # Auto-rollback if this rule_name or step_dir already exists + existing_rule_names = [h["rule_name"] for h in self._history] + if rule_name in existing_rule_names or step_dir.exists(): + rollback_result = self.rollback_step() + if not rollback_result["success"]: + return { + "success": False, + "error": ( + f"Rule '{rule_name}' already exists and auto-rollback failed: " + f"{rollback_result.get('error')}" + ), + } + # Recalculate step_dir after rollback since history length changed + step_dir = self._next_step_dir(rule_name) + + # Validate all resolved output paths + for v in resolved_output.values(): + _, err = _validate_path(v, mode="write") + if err: + return {"success": False, "error": err} + + pre_append_size = self._snakefile_size() + step_dir.mkdir(parents=True, exist_ok=True) + self._append_rule(rule_text) + self._history.append( + { + "rule_name": rule_name, + "step_dir": str(step_dir.relative_to(SNAKEMAKE_WORK_DIR)), + "snakefile_bytes": pre_append_size, + } + ) + + targets = list(resolved_output.values()) + result = self._run_snakemake(targets) + + if result.stdout: + print(result.stdout) + if result.stderr: + print(result.stderr) + return { + "success": result.returncode == 0, + "returncode": result.returncode, + "stdout": result.stdout, + "stderr": result.stderr, + "auto_rolled_back": rule_name in existing_rule_names or step_dir.exists(), + "step_dir": str(step_dir.relative_to(SNAKEMAKE_WORK_DIR)), + "work_dir_listing": _dir_listing(Path(SNAKEMAKE_WORK_DIR) / "steps"), + } + + @workflow_tool + def get_environment(self) -> Dict[str, Any]: + """ + Returns the agent's runtime environment: the work directory structure, + path conventions, wrapper version, and current step history. Call this + first at the start of every session before doing anything else. + + Takes no arguments. + + Returns a dictionary containing: + - work_dir_structure: description of WORK_DIR layout + - path_conventions: rules the agent must follow for all file paths + - wrapper_version: the snakemake-wrappers version in use + - steps_completed: number of steps executed so far this session + - history: ordered list of steps executed with their step directories + - available: whether the provider is ready to use + """ + return { + "success": True, + "work_dir_structure": { + "input": "WORK_DIR/input/ — staged input data (read-only, do not write here)", + "steps": "WORK_DIR/steps/ — one subdirectory per executed step (NN_rulename/)", + "logs": "WORK_DIR/logs/ — per-rule log files written automatically", + "snakefile": "WORK_DIR/Snakefile — accumulating workflow, appended each step", + }, + "path_conventions": { + "input_files": ( + "Relative to WORK_DIR/input/. " + "Example: 'samples/A.fastq' resolves to WORK_DIR/input/samples/A.fastq" + ), + "output_files": ( + "Relative to this step's directory. " + "Example: 'A.bam' resolves to WORK_DIR/steps/NN_rulename/A.bam" + ), + "chained_inputs": ( + "To use a prior step's output as input, prefix with 'steps/NN_rulename/'. " + "Example: 'steps/01_bwa_mem_A/A.bam'" + ), + "never_use": ( + "Absolute paths, environment variable names, or bare filenames " + "without the correct prefix." + ), + }, + "wrapper_version": SNAKEMAKE_WRAPPER_VERSION, + "steps_completed": len(self._history), + "history": [ + {"step": i + 1, "rule_name": h["rule_name"], "step_dir": h["step_dir"]} + for i, h in enumerate(self._history) + ], + "available": self.available, + } + + @workflow_tool + def list_input_dir(self) -> Dict[str, Any]: + """ + Lists all files staged in WORK_DIR/input/ recursively. Call this + during Phase 1 (discovery) to understand what input data is available + before planning the workflow. + + Takes no arguments. + + Returns a dictionary containing: + - root: the base directory listed ('input/') + - items: list of dicts with 'path' (relative to input/), 'type' + (file or dir), and 'size_bytes'. Use these paths directly as + input values when calling execute_wrapper or execute_rule. + """ + input_dir = Path(SNAKEMAKE_WORK_DIR) / "input" + if not input_dir.exists(): + return {"success": False, "error": "Input staging directory does not exist."} + + return { + "success": True, + "root": "input/", + "items": _dir_listing(input_dir), + } + + @workflow_tool + def list_work_dir(self) -> Dict[str, Any]: + """ + Lists all files in WORK_DIR/steps/ recursively, showing what outputs + have been produced so far. Call this after each execution step to + verify expected outputs exist before proceeding to the next step. + + Takes no arguments. + + Returns a dictionary containing: + - root: the base directory listed ('steps/') + - items: list of dicts with 'path' (relative to steps/), 'type' + (file or dir), and 'size_bytes' + - history: ordered list of steps executed this session, each with + 'step' (number), 'rule_name', and 'step_dir' + """ + if not self.available: + return {"success": False, "error": "Snakemake provider is not available."} + + steps_dir = Path(SNAKEMAKE_WORK_DIR) / "steps" + if not steps_dir.exists(): + return {"success": False, "error": "Steps directory does not exist."} + + return { + "success": True, + "root": "steps/", + "items": _dir_listing(steps_dir), + "history": [ + {"step": i + 1, "rule_name": h["rule_name"], "step_dir": h["step_dir"]} + for i, h in enumerate(self._history) + ], + } + + @workflow_tool + def search_wrappers(self, query: str) -> Dict[str, Any]: + """ + Search the snakemake-wrappers catalog by keyword. + + Args: + query (str): Keyword or phrase to search. Searched across wrapper + path, name, description, category, and conda package names. + Examples: 'bwa', 'samtools sort', 'variant calling', 'fastqc'. + + Returns a dictionary containing: + - count: number of matches found + - results: list of dicts, each with 'path', 'name', 'type' + ('wrapper' or 'meta_wrapper'), 'description', and 'category'. + Use the 'path' value when calling get_wrapper_details or + execute_wrapper. Call get_wrapper_details on any candidate + before using it to see full documentation and example rules. + """ + if not self.available: + return {"success": False, "error": "Snakemake provider is not available."} + + query_lower = query.lower() + matches = [] + for wrapper_path, entry in self._index.items(): + searchable = " ".join( + [ + entry["path"], + entry["name"], + entry["description"], + entry["category"], + entry["type"], + " ".join(str(p) for p in entry["conda_packages"]), + ] + ).lower() + if query_lower in searchable: + matches.append( + { + "path": entry["path"], + "name": entry["name"], + "type": entry["type"], + "description": entry["description"], + "category": entry["category"], + } + ) + + return { + "success": True, + "count": len(matches), + "results": matches, + } + + @workflow_tool + def get_wrapper_details(self, wrapper_path: str) -> Dict[str, Any]: + """ + Returns full documentation for a wrapper. You MUST call this before + execute_wrapper for any wrapper you intend to use. + + Args: + wrapper_path (str): The catalog path of the wrapper, exactly as + returned in the 'path' field of search_wrappers results. + Examples: 'bio/bwa/mem', 'bio/samtools/sort', + 'meta/bio/bwa_mapping'. + + Returns a dictionary containing: + - name: human-readable wrapper name + - description: what the wrapper does + - authors: list of wrapper authors + - conda_packages: list of software dependencies installed automatically + - readme: full README text with usage notes + - example_rule: example Snakefile rule from the wrapper's test suite. + Inspect this carefully — the input/output key names and params + shown here are exactly what you must use in execute_wrapper. + - type: 'wrapper' or 'meta_wrapper' + """ + if not self.available: + return {"success": False, "error": "Snakemake provider is not available."} + + entry = self._index.get(wrapper_path) + if not entry: + return {"success": False, "error": f"Wrapper '{wrapper_path}' not found in index."} + + return {"success": True, **entry} + + @workflow_tool + def execute_wrapper( + self, + rule_name: str, + wrapper_path: str, + input: Dict[str, Any], + output: Dict[str, Any], + params: Optional[Dict[str, Any]] = None, + threads: int = 1, + log: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Executes a single workflow step using a snakemake-wrappers catalog + wrapper or meta-wrapper. The rule is appended to the accumulating + Snakefile and executed immediately. Outputs are written to + WORK_DIR/steps/NN_rulename/. + + Args: + rule_name (str): A unique snake_case identifier for this rule. + Must be a valid Python identifier. Used to name the step + directory (WORK_DIR/steps/NN_rulename/). Examples: + 'bwa_mem_A', 'samtools_sort_sample_B'. + + wrapper_path (str): Catalog path of the wrapper exactly as + returned by search_wrappers. Examples: 'bio/bwa/mem', + 'bio/samtools/sort', 'meta/bio/bwa_mapping'. + + input (dict): Mapping of input argument names to file paths. + Key names MUST match those shown in the wrapper's example + rule (from get_wrapper_details). Path conventions: + - Staged input files: relative to WORK_DIR/input/. + Example: {"reads": "samples/A.fastq", "ref": "genome.fa"} + - Prior step outputs: prefix with 'steps/NN_rulename/'. + Example: {"bam": "steps/01_bwa_mem_A/A.bam"} + Never use absolute paths or environment variable names. + + output (dict): Mapping of output argument names to file paths. + Key names MUST match those shown in the wrapper's example + rule. Paths are relative to this step's directory + (WORK_DIR/steps/NN_rulename/). + Example: {"bam": "A.bam"} writes to + WORK_DIR/steps/NN_rulename/A.bam. + Never use absolute paths or environment variable names. + + params (dict, optional): Mapping of parameter names to values + as shown in the wrapper's example rule. Values can be + strings, integers, or booleans. + Example: {"sorting": "samtools", "sort_order": "coordinate"}. + + threads (int, optional): Number of threads to allocate. + Defaults to 1. + + log (str, optional): Pass any truthy value to enable logging. + Log written automatically to WORK_DIR/logs/{rule_name}.log. + + Returns a dictionary containing: + - success (bool): True if snakemake exited with returncode 0 + - returncode (int): snakemake process return code + - stdout (str): snakemake standard output + - stderr (str): snakemake standard error — inspect this carefully + on failure to understand what went wrong + - step_dir (str): path of this step's output directory relative + to WORK_DIR, e.g. 'steps/01_bwa_mem_A' + - work_dir_listing: current recursive listing of WORK_DIR/steps/ + + Always check 'success' before proceeding to the next step. + Use rollback_step to undo this step if you want to retry with + different arguments. + """ + if not self.available: + return {"success": False, "error": "Snakemake provider is not available."} + + entry = self._index.get(wrapper_path) + if not entry: + return {"success": False, "error": f"Wrapper '{wrapper_path}' not found in index."} + + step_dir = self._next_step_dir(rule_name) + lines, resolved_output = self._build_rule_lines( + rule_name, input, output, params, threads, log, step_dir + ) + directive = "meta_wrapper" if entry["type"] == "meta_wrapper" else "wrapper" + lines.append(f' {directive}: "{SNAKEMAKE_WRAPPER_VERSION}/{wrapper_path}"') + rule_text = "\n".join(lines) + + return self._execute(rule_name, rule_text, resolved_output, step_dir) + + @workflow_tool + def execute_rule( + self, + rule_name: str, + input: Dict[str, Any], + output: Dict[str, Any], + params: Optional[Dict[str, Any]] = None, + threads: int = 1, + log: Optional[str] = None, + shell: Optional[str] = None, + run: Optional[str] = None, + script: Optional[str] = None, + ) -> Dict[str, Any]: + """ + Executes a single custom workflow step written by the agent. Use this + for glue steps that have no suitable catalog wrapper. Exactly one of + shell, run, or script must be provided. Outputs go to + WORK_DIR/steps/NN_rulename/. + + Args: + rule_name (str): A unique snake_case identifier for this rule. + Must be a valid Python identifier. Used to name the step + directory (WORK_DIR/steps/NN_rulename/). Examples: + 'convert_sam_to_bam', 'rename_output_A'. + + input (dict): Mapping of input argument names to file paths. + Path conventions: + - Staged input files: relative to WORK_DIR/input/. + Example: {"reads": "samples/A.fastq"} + - Prior step outputs: prefix with 'steps/NN_rulename/'. + Example: {"bam": "steps/01_bwa_mem_A/A.bam"} + Never use absolute paths or environment variable names. + + output (dict): Mapping of output argument names to file paths. + Paths are relative to this step's directory + (WORK_DIR/steps/NN_rulename/). + Example: {"sorted": "A.sorted.bam"} writes to + WORK_DIR/steps/NN_rulename/A.sorted.bam. + Never use absolute paths or environment variable names. + + params (dict, optional): Mapping of parameter names to values. + Accessible in shell commands as {params.key_name}. + Example: {"min_mapq": 20}. + + threads (int, optional): Number of threads to allocate. + Defaults to 1. + + log (str, optional): Pass any truthy value to enable logging. + Log written automatically to WORK_DIR/logs/{rule_name}.log. + + shell (str, optional): A shell command string using Snakemake + placeholder syntax to reference inputs, outputs, and params. + Example: "bwa mem {input.ref} {input.reads} | samtools view + -Sb - > {output.bam}". + Exactly one of shell, run, or script must be provided. + + run (str, optional): A block of Python code executed directly. + Has access to the snakemake object for inputs/outputs/params. + Example: "import shutil\nshutil.copy(snakemake.input[0], + snakemake.output[0])". + Exactly one of shell, run, or script must be provided. + + script (str, optional): Path to an external script file relative + to WORK_DIR. The script receives a snakemake object + automatically. Exactly one of shell, run, or script must + be provided. + + Returns a dictionary containing: + - success (bool): True if snakemake exited with returncode 0 + - returncode (int): snakemake process return code + - stdout (str): snakemake standard output + - stderr (str): snakemake standard error — inspect this carefully + on failure to understand what went wrong + - step_dir (str): path of this step's output directory relative + to WORK_DIR, e.g. 'steps/02_samtools_sort_A' + - work_dir_listing: current recursive listing of WORK_DIR/steps/ + + Always check 'success' before proceeding to the next step. + Use rollback_step to undo this step if you want to retry with + different arguments. + """ + if not self.available: + return {"success": False, "error": "Snakemake provider is not available."} + + directives = [x for x in (shell, run, script) if x is not None] + if len(directives) != 1: + return { + "success": False, + "error": "Exactly one of 'shell', 'run', or 'script' must be provided.", + } + + step_dir = self._next_step_dir(rule_name) + lines, resolved_output = self._build_rule_lines( + rule_name, input, output, params, threads, log, step_dir + ) + + if shell is not None: + lines.append(f' shell: "{shell}"') + elif script is not None: + lines.append(f' script: "{script}"') + elif run is not None: + lines.append(" run:") + for line in run.splitlines(): + lines.append(f" {line}") + + rule_text = "\n".join(lines) + return self._execute(rule_name, rule_text, resolved_output, step_dir) + + @workflow_tool + def rollback_step(self) -> Dict[str, Any]: + """ + Rolls back the most recently executed step. Removes the step's output + directory from WORK_DIR/steps/, truncates the Snakefile to remove the + appended rule, and pops the step from history. + + Takes no arguments. Always rolls back exactly one step (the most recent). + + Use this when a step has failed and you want to try a different wrapper, + different parameters, or a custom rule instead. After rolling back, the + next call to execute_wrapper or execute_rule will reuse the same step + number. + + Returns a dictionary containing: + - success (bool): True if rollback completed successfully + - rolled_back (str): rule_name of the step that was removed + - step_dir_removed (str): the step directory that was deleted, + relative to WORK_DIR + - snakefile_truncated_to_bytes (int): Snakefile size after truncation + - steps_remaining (int): number of steps still in history + - history: updated ordered list of remaining steps + """ + if not self.available: + return {"success": False, "error": "Snakemake provider is not available."} + + if not self._history: + return {"success": False, "error": "No steps to roll back."} + + entry = self._history.pop() + rule_name = entry["rule_name"] + step_dir = Path(SNAKEMAKE_WORK_DIR) / entry["step_dir"] + pre_size = entry["snakefile_bytes"] + + # Remove step output directory + if step_dir.exists(): + shutil.rmtree(step_dir) + + # Truncate Snakefile back to pre-append size + if self._snakefile_path.exists(): + with open(self._snakefile_path, "r+") as f: + f.truncate(pre_size) + + return { + "success": True, + "rolled_back": rule_name, + "step_dir_removed": str(entry["step_dir"]), + "snakefile_truncated_to_bytes": pre_size, + "steps_remaining": len(self._history), + "history": [ + {"step": i + 1, "rule_name": h["rule_name"], "step_dir": h["step_dir"]} + for i, h in enumerate(self._history) + ], + } From 4507d3b8dbe65f25c07e2236714c74a3684ec0ca Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 11 May 2026 08:00:34 -0700 Subject: [PATCH 2/6] feat: snakemake additional functions Signed-off-by: vsoch --- .../providers/workflow/snakemake.py | 169 +++++++++++++++++- 1 file changed, 162 insertions(+), 7 deletions(-) diff --git a/resource_secretary/providers/workflow/snakemake.py b/resource_secretary/providers/workflow/snakemake.py index fb9fdc1..929474f 100644 --- a/resource_secretary/providers/workflow/snakemake.py +++ b/resource_secretary/providers/workflow/snakemake.py @@ -273,18 +273,24 @@ def _steps_dir(self) -> Path: return Path(SNAKEMAKE_WORK_DIR) / "steps" def _next_step_dir(self, rule_name: str) -> Path: - """Returns the next numbered step directory path (not yet created).""" + """ + Returns the next numbered step directory path (not yet created). + """ n = len(self._history) + 1 return self._steps_dir / f"{n:02d}_{rule_name}" def _append_rule(self, rule_text: str): - """Appends a rule block to the accumulating Snakefile.""" + """ + Appends a rule block to the accumulating Snakefile. + """ with open(self._snakefile_path, "a") as f: f.write("\n\n") f.write(rule_text) def _snakefile_size(self) -> int: - """Returns current Snakefile byte length, 0 if not yet created.""" + """ + Returns current Snakefile byte length, 0 if not yet created. + """ if self._snakefile_path.exists(): return self._snakefile_path.stat().st_size return 0 @@ -334,6 +340,132 @@ def _resolve_input_paths(self, input: Dict[str, Any]) -> Dict[str, str]: resolved[k] = str(work_p / "input" / v) return resolved + def _rebuild_from_content(self, rules_to_keep: List[tuple[str, str]]): + """ + Helper to rewrite the Snakefile and update internal history based + on a list of (rule_name, rule_text) blocks. + """ + # Clear existing file + self._snakefile_path.write_text("") + + # We need to rebuild history. + # Note: This logic assumes the step directories already exist on disk. + new_history = [] + + for name, text in rules_to_keep: + pre_size = self._snakefile_size() + + # Find the existing history entry to preserve the step_dir mapping + # if it exists, otherwise we'd lose the link to the folder. + old_entry = next((h for h in self._history if h["rule_name"] == name), None) + + with open(self._snakefile_path, "a") as f: + f.write("\n\n" + text.strip()) + + if old_entry: + new_history.append( + { + "rule_name": name, + "step_dir": old_entry["step_dir"], + "snakefile_bytes": pre_size, + } + ) + + self._history = new_history + + @workflow_tool + def delete_rule(self, rule_name: str, delete_data: bool = False) -> Dict[str, Any]: + """ + Removes a specific rule from the Snakefile by name. + + Args: + rule_name (str): The name of the rule to delete. + delete_data (bool): If True, also deletes the associated + 'steps/NN_rule_name' directory. Defaults to False. + + Returns: + - success: True if the rule was found and removed. + """ + if not self._snakefile_path.exists(): + return {"success": False, "error": "Snakefile does not exist."} + + content = self._snakefile_path.read_text() + # Split content into blocks starting with "rule " + # We use a simple split and filter to keep rule logic together + raw_blocks = content.split("\n\n") + new_blocks = [] + found = False + + for block in raw_blocks: + if block.strip().startswith(f"rule {rule_name}:"): + found = True + continue + if block.strip(): + # Extract rule name for the rebuilder + name = block.strip().splitlines()[0].replace("rule ", "").replace(":", "").strip() + new_blocks.append((name, block)) + + if not found: + return {"success": False, "error": f"Rule '{rule_name}' not found."} + + # Handle data deletion if requested + if delete_data: + entry = next((h for h in self._history if h["rule_name"] == rule_name), None) + if entry: + step_p = Path(SNAKEMAKE_WORK_DIR) / entry["step_dir"] + if step_p.exists(): + shutil.rmtree(step_p) + + self._rebuild_from_content(new_blocks) + return {"success": True, "message": f"Rule '{rule_name}' removed."} + + @workflow_tool + def deduplicate_rules(self) -> Dict[str, Any]: + """ + Parses the Snakefile and finds rules with duplicate names. + It keeps only the LAST occurrence of any duplicate rule and + removes the others. This is useful for cleaning up failed retries. + + Returns: + - success: True if deduplication completed. + - removed_count: Number of duplicate rules removed. + """ + if not self._snakefile_path.exists(): + return {"success": True, "removed_count": 0} + + content = self._snakefile_path.read_text() + raw_blocks = content.split("\n\n") + + # Map rule_name -> last_seen_content + unique_rules = {} + order = [] # To keep track of the first time we saw a rule to preserve order + + count_before = 0 + for block in raw_blocks: + clean = block.strip() + if not clean.startswith("rule "): + continue + + count_before += 1 + name = clean.splitlines()[0].replace("rule ", "").replace(":", "").strip() + + if name not in unique_rules: + order.append(name) + unique_rules[name] = block + + # Reconstruct blocks based on the order they first appeared, + # but using the content of the last appearance. + final_blocks = [(name, unique_rules[name]) for name in order] + + removed_count = count_before - len(final_blocks) + self._rebuild_from_content(final_blocks) + + return { + "success": True, + "removed_count": removed_count, + "current_rule_count": len(final_blocks), + } + def _resolve_output_paths(self, output: Dict[str, Any], step_dir: Path) -> Dict[str, str]: """ Resolves output paths relative to the step's output directory. @@ -843,6 +975,29 @@ def execute_rule( return self._execute(rule_name, rule_text, resolved_output, step_dir) @workflow_tool + def view_snakefile(self) -> Dict[str, Any]: + """ + Returns the full content of the current Snakefile. + Use this to inspect the sequence of rules and their parameters. + + Returns a dictionary containing: + - success (bool): True if file was read + - content (str): The full text of the Snakefile + - rule_count (int): Number of rules currently defined + """ + if not self._snakefile_path.exists(): + return {"success": True, "content": "", "rule_count": 0} + + try: + content = self._snakefile_path.read_text() + # Basic count by looking for the "rule " keyword at the start of lines + rule_count = len( + [line for line in content.splitlines() if line.strip().startswith("rule ")] + ) + return {"success": True, "content": content, "rule_count": rule_count} + except Exception as e: + return {"success": False, "error": str(e)} + def rollback_step(self) -> Dict[str, Any]: """ Rolls back the most recently executed step. Removes the step's output @@ -851,10 +1006,10 @@ def rollback_step(self) -> Dict[str, Any]: Takes no arguments. Always rolls back exactly one step (the most recent). - Use this when a step has failed and you want to try a different wrapper, - different parameters, or a custom rule instead. After rolling back, the - next call to execute_wrapper or execute_rule will reuse the same step - number. + By default, when a step fails, we roll back for you. You must ONLY call + this tool when you want an additional rollback. Use this when you want to + redo a step that was not rolled back. You MUST inspect the Snakefile to + confirm first. Returns a dictionary containing: - success (bool): True if rollback completed successfully From 8d358d1c8e61c7a9fcd944c3fbe133b722946a29 Mon Sep 17 00:00:00 2001 From: vsoch Date: Fri, 15 May 2026 10:55:39 -0700 Subject: [PATCH 3/6] snakemake: updates after n=20 test Signed-off-by: vsoch --- .../providers/workflow/snakemake.py | 385 ++++++++---------- 1 file changed, 173 insertions(+), 212 deletions(-) diff --git a/resource_secretary/providers/workflow/snakemake.py b/resource_secretary/providers/workflow/snakemake.py index 929474f..e3391ac 100644 --- a/resource_secretary/providers/workflow/snakemake.py +++ b/resource_secretary/providers/workflow/snakemake.py @@ -7,6 +7,8 @@ import yaml +import resource_secretary.utils as utils + from ..provider import BaseProvider, workflow_tool # Module-level environment configuration, read at import time @@ -19,6 +21,45 @@ print(f" SNAKEMAKE_INPUT_DIR: {SNAKEMAKE_INPUT_DIR}") print(f" SNAKEMAKE_WRAPPER_VERSION: {SNAKEMAKE_WRAPPER_VERSION}") +SHARED_EXECUTE_DOCSTRING = """ + Args: + rule_name (str): A unique snake_case identifier for this rule. + Used to name the step directory (WORK_DIR/steps/NN_rulename/). + Examples: 'bwa_mem_A', 'samtools_sort_sample_B'. + + input (dict): Mapping of input argument names to file paths. + Path conventions: + - Staged input files: relative to WORK_DIR/input/. + Example: {"reads": ["samples/A_1.fastq", "samples/A_2.fastq"]} + - Prior step outputs: prefix with 'steps/NN_rulename/'. + Example: {"bam": "steps/01_bwa_mem_A/A.bam"} + Never use absolute paths or environment variable names. + *BIO TIP*: If a tool needs multiple index files, pass them as a list! + Example: {"idx": ["genome.fa.amb", "genome.fa.ann", "genome.fa.bwt"]} + + output (dict): Mapping of output argument names to file paths. + Paths are relative to this step's directory (WORK_DIR/steps/NN_rulename/). + Example: {"bam": "A.bam"} writes to WORK_DIR/steps/NN_rulename/A.bam. + + params (dict, optional): Mapping of parameter names to values. + *BIO TIP*: For parameters requiring tabs (like BWA Read Groups), + escape them carefully using double backslashes or spaces. + Example: {"extra": r"-R '@RG\\tID:A\\tSM:A'"} + + threads (int, optional): Number of threads to allocate. Defaults to 1. + cores (int, optional): Number of cores to allocate. Defaults to 1. + log (str, optional): Pass any truthy value to enable automatic logging. + + Returns a dictionary containing: + - success (bool): True if snakemake exited with returncode 0 + - returncode (int): snakemake process return code + - stdout/stderr (str): Process output. Inspect carefully on failure. + - step_dir (str): path of this step's output directory. + - work_dir_listing: current recursive listing of WORK_DIR/steps/ + + Always check 'success' before proceeding to the next step. +""" + def _validate_path(path: str, mode: str = "read") -> tuple[Optional[Path], Optional[str]]: """ @@ -57,7 +98,9 @@ def _validate_path(path: str, mode: str = "read") -> tuple[Optional[Path], Optio def _dir_listing(root: Path) -> List[Dict[str, Any]]: - """Returns a recursive sorted listing of a directory.""" + """ + Returns a recursive sorted listing of a directory. + """ items = [] for p in sorted(root.rglob("*")): items.append( @@ -90,7 +133,7 @@ def __init__(self, repo_url: str = "https://github.com/snakemake/snakemake-wrapp self._index: Dict[str, Dict[str, Any]] = {} self._conda_frontend: Optional[str] = None - # Execution state + # Execution state - AFTER a step is run. self._history = [] # Each entry: {rule_name, step_dir (relative to WORK_DIR), snakefile_bytes} @@ -111,27 +154,36 @@ def metadata(self) -> Dict[str, Any]: "steps_completed": len(self._history), } + def get_required_software(self, name, required=True): + """ + Use shutil to find required (or fail) + """ + path = shutil.which(name) + if not path and required: + raise ValueError(f" SnakemakeProvider: {name} not found in PATH.") + return path + def probe(self) -> bool: """ Ensures snakemake and git are installed, clones the wrappers repository to a session-scoped temp directory, builds the wrapper index, and stages input data into WORK_DIR/input/ via symlinks. """ - if not shutil.which("snakemake"): - raise ValueError(" SnakemakeProvider: snakemake not found in PATH.") - if not shutil.which("git"): - raise ValueError(" SnakemakeProvider: git not found in PATH.") + self._snakemake = self.get_required_software("snakemake") + self._git = self.get_required_software("git") + self._conda = self.get_required_software( + "mamba", required=False + ) or self.get_required_software("conda") + self._conda_frontend = "mamba" if "mamba" in self._conda else "conda" + if not SNAKEMAKE_WORK_DIR: raise ValueError( " SnakemakeProvider: RESOURCE_SECRETARY_SNAKEMAKE_WORKDIR is not set." ) - if not shutil.which("mamba") and not shutil.which("conda"): - raise ValueError( - " SnakemakeProvider: neither mamba nor conda found in PATH. One is required for --use-conda wrapper execution." - ) + self._check_wrapper_utils() - self._conda_frontend = "mamba" if shutil.which("mamba") else "conda" + # Ensure we have cloned wrappers if SNAKEMAKE_WRAPPER_CLONE is not None and os.path.exists(SNAKEMAKE_WRAPPER_CLONE): self.repo_url = SNAKEMAKE_WRAPPER_CLONE else: @@ -144,22 +196,15 @@ def probe(self) -> bool: def clone_snakemake(self): """ - Clone snakemake to a temporary directory + Clone snakemake to a temporary directory or update already cloned """ tmp_dir = Path(tempfile.gettempdir()) / "snakemake_wrappers_catalog" if not tmp_dir.exists(): - print(f" SnakemakeProvider: cloning wrappers to {tmp_dir} ...") - subprocess.run( - ["git", "clone", "--depth", "1", self.repo_url, str(tmp_dir)], - check=True, - capture_output=True, - ) + command = [self._git, "clone", "--depth", "1", self.repo_url, str(tmp_dir)] else: - print(f" SnakemakeProvider: wrappers repo exists at {tmp_dir}, pulling latest.") - subprocess.run( - ["git", "-C", str(tmp_dir), "pull"], - capture_output=True, - ) + command = [self._git, "-C", str(tmp_dir), "pull"] + print(f" SnakemakeProvider: cloning or updating wrappers at {tmp_dir} ...") + subprocess.run(command, check=True, capture_output=True) return tmp_dir def _check_wrapper_utils(self) -> bool: @@ -212,40 +257,34 @@ def _build_index(self): for meta_path in self.repo_path.rglob("meta.yaml"): wrapper_dir = meta_path.parent wrapper_path = str(wrapper_dir.relative_to(self.repo_path)) - try: - with open(meta_path) as f: - meta = yaml.safe_load(f) or {} + meta = utils.read_yaml(meta_path) or {} except Exception: - continue + meta = {} + print(f"Issue reading {meta_path}") - conda_packages = [] env_path = wrapper_dir / "environment.yaml" - if env_path.exists(): - try: - with open(env_path) as f: - env = yaml.safe_load(f) or {} - conda_packages = env.get("dependencies", []) - except Exception: - pass + try: + env = utils.read_yaml(str(env_path)) or {} + conda_packages = env.get("dependencies", []) + except Exception: + print(f"Issue reading {env_path}") + conda_packages = [] - readme = "" + # No print - unlikely to exist for readme_name in ("README.md", "readme.md"): readme_path = wrapper_dir / readme_name - if readme_path.exists(): - try: - readme = readme_path.read_text() - except Exception: - pass - break - - example_rule = "" - test_snakefile = wrapper_dir / "test" / "Snakefile" - if test_snakefile.exists(): try: - example_rule = test_snakefile.read_text() + readme = readme_path.read_text() except Exception: - pass + readme = "" + + test_snakefile = wrapper_dir / "test" / "Snakefile" + try: + example_rule = test_snakefile.read_text() + except Exception: + print(f"Issue reading {test_snakefile}") + example_rule = "" category = wrapper_path.split("/")[0] wrapper_type = "meta_wrapper" if wrapper_path.startswith("meta/") else "wrapper" @@ -276,8 +315,17 @@ def _next_step_dir(self, rule_name: str) -> Path: """ Returns the next numbered step directory path (not yet created). """ - n = len(self._history) + 1 - return self._steps_dir / f"{n:02d}_{rule_name}" + existing_numbers = [0] + for h in self._history: + try: + # Extract '01' from 'steps/01_rule_name' + basename = Path(h["step_dir"]).name + existing_numbers.append(int(basename.split("_")[0])) + except ValueError: + pass + + next_num = max(existing_numbers) + 1 + return self._steps_dir / f"{next_num:02d}_{rule_name}" def _append_rule(self, rule_text: str): """ @@ -295,15 +343,18 @@ def _snakefile_size(self) -> int: return self._snakefile_path.stat().st_size return 0 - def _run_snakemake(self, targets: List[str]) -> subprocess.CompletedProcess: + def _run_snakemake(self, targets: List[str], cores: int = 1) -> subprocess.CompletedProcess: + """ + Underlying helper to run snakemake with subprocess + """ return subprocess.run( [ - "snakemake", + self._snakemake, "--use-conda", "--conda-frontend", self._conda_frontend, "--cores", - "1", + str(cores), "--snakefile", str(self._snakefile_path), ] @@ -354,35 +405,27 @@ def _rebuild_from_content(self, rules_to_keep: List[tuple[str, str]]): for name, text in rules_to_keep: pre_size = self._snakefile_size() - - # Find the existing history entry to preserve the step_dir mapping - # if it exists, otherwise we'd lose the link to the folder. old_entry = next((h for h in self._history if h["rule_name"] == name), None) - + if old_entry is None: + continue with open(self._snakefile_path, "a") as f: f.write("\n\n" + text.strip()) - - if old_entry: - new_history.append( - { - "rule_name": name, - "step_dir": old_entry["step_dir"], - "snakefile_bytes": pre_size, - } - ) - + new_history.append( + { + "rule_name": name, + "step_dir": old_entry["step_dir"], + "snakefile_bytes": pre_size, + } + ) self._history = new_history @workflow_tool - def delete_rule(self, rule_name: str, delete_data: bool = False) -> Dict[str, Any]: + def delete_rule(self, rule_name: str) -> Dict[str, Any]: """ Removes a specific rule from the Snakefile by name. Args: rule_name (str): The name of the rule to delete. - delete_data (bool): If True, also deletes the associated - 'steps/NN_rule_name' directory. Defaults to False. - Returns: - success: True if the rule was found and removed. """ @@ -391,7 +434,7 @@ def delete_rule(self, rule_name: str, delete_data: bool = False) -> Dict[str, An content = self._snakefile_path.read_text() # Split content into blocks starting with "rule " - # We use a simple split and filter to keep rule logic together + # Use a simple split and filter to keep rule logic together raw_blocks = content.split("\n\n") new_blocks = [] found = False @@ -408,13 +451,12 @@ def delete_rule(self, rule_name: str, delete_data: bool = False) -> Dict[str, An if not found: return {"success": False, "error": f"Rule '{rule_name}' not found."} - # Handle data deletion if requested - if delete_data: - entry = next((h for h in self._history if h["rule_name"] == rule_name), None) - if entry: - step_p = Path(SNAKEMAKE_WORK_DIR) / entry["step_dir"] - if step_p.exists(): - shutil.rmtree(step_p) + # Handle data deletion + entry = next((h for h in self._history if h["rule_name"] == rule_name), None) + if entry: + step_p = Path(SNAKEMAKE_WORK_DIR) / entry["step_dir"] + if step_p.exists(): + shutil.rmtree(step_p) self._rebuild_from_content(new_blocks) return {"success": True, "message": f"Rule '{rule_name}' removed."} @@ -438,7 +480,7 @@ def deduplicate_rules(self) -> Dict[str, Any]: # Map rule_name -> last_seen_content unique_rules = {} - order = [] # To keep track of the first time we saw a rule to preserve order + order = [] # keep track of the first time we saw a rule to preserve order count_before = 0 for block in raw_blocks: @@ -453,7 +495,7 @@ def deduplicate_rules(self) -> Dict[str, Any]: order.append(name) unique_rules[name] = block - # Reconstruct blocks based on the order they first appeared, + # Reconstruct blocks based on the order they first appeared # but using the content of the last appearance. final_blocks = [(name, unique_rules[name]) for name in order] @@ -464,6 +506,7 @@ def deduplicate_rules(self) -> Dict[str, Any]: "success": True, "removed_count": removed_count, "current_rule_count": len(final_blocks), + "message": f"Deduplication complete. Removed {removed_count} duplicate rules. View the snakefile to ensure required steps were not lost.", } def _resolve_output_paths(self, output: Dict[str, Any], step_dir: Path) -> Dict[str, str]: @@ -479,6 +522,19 @@ def _resolve_output_paths(self, output: Dict[str, Any], step_dir: Path) -> Dict[ resolved[k] = str(step_dir / str(v)) return resolved + def _add_rule_lines(self, name, resolved, lines): + """ + shared logic for adding rule lines (input or output) + """ + lines.append(f" {name}:") + for k, v in resolved.items(): + if isinstance(v, list): + items = ", ".join(f'"{item}"' for item in v) + lines.append(f" {k}=[{items}],") + else: + lines.append(f' {k}="{v}",') + return lines + def _build_rule_lines( self, rule_name: str, @@ -494,31 +550,19 @@ def _build_rule_lines( with all paths fully resolved. Returns (lines, resolved_output). Supports list values for inputs and outputs that expect multiple files. """ + # Inputs and outputs resolved_input = self._resolve_input_paths(input) resolved_output = self._resolve_output_paths(output, step_dir) - lines = [f"rule {rule_name}:"] - - lines.append(" input:") - for k, v in resolved_input.items(): - if isinstance(v, list): - items = ", ".join(f'"{item}"' for item in v) - lines.append(f" {k}=[{items}],") - else: - lines.append(f' {k}="{v}",') - - lines.append(" output:") - for k, v in resolved_output.items(): - if isinstance(v, list): - items = ", ".join(f'"{item}"' for item in v) - lines.append(f" {k}=[{items}],") - else: - lines.append(f' {k}="{v}",') + lines = self._add_rule_lines("input", resolved_input, lines) + lines = self._add_rule_lines("output", resolved_output, lines) if params: lines.append(" params:") for k, v in params.items(): - val = f'"{v}"' if isinstance(v, str) else str(v) + # This line was giving trouble when the agent asked for tab \t. + # I think repr (raw) should work + val = repr(v) if isinstance(v, str) else str(v) lines.append(f" {k}={val},") if log: @@ -534,24 +578,17 @@ def _execute( rule_text: str, resolved_output: Dict[str, str], step_dir: Path, + cores: int = 1, ) -> Dict[str, Any]: """ Shared execution path for both execute_wrapper and execute_rule. """ - # Auto-rollback if this rule_name or step_dir already exists existing_rule_names = [h["rule_name"] for h in self._history] if rule_name in existing_rule_names or step_dir.exists(): - rollback_result = self.rollback_step() - if not rollback_result["success"]: - return { - "success": False, - "error": ( - f"Rule '{rule_name}' already exists and auto-rollback failed: " - f"{rollback_result.get('error')}" - ), - } - # Recalculate step_dir after rollback since history length changed - step_dir = self._next_step_dir(rule_name) + return { + "success": False, + "error": f"Rule '{rule_name}' already exists. Pick a new name, or use delete_rule('{rule_name}') first.", + } # Validate all resolved output paths for v in resolved_output.values(): @@ -571,7 +608,7 @@ def _execute( ) targets = list(resolved_output.values()) - result = self._run_snakemake(targets) + result = self._run_snakemake(targets, cores=cores) if result.stdout: print(result.stdout) @@ -789,68 +826,30 @@ def execute_wrapper( output: Dict[str, Any], params: Optional[Dict[str, Any]] = None, threads: int = 1, + cores: int = 1, log: Optional[str] = None, ) -> Dict[str, Any]: - """ + f""" Executes a single workflow step using a snakemake-wrappers catalog wrapper or meta-wrapper. The rule is appended to the accumulating Snakefile and executed immediately. Outputs are written to WORK_DIR/steps/NN_rulename/. - Args: - rule_name (str): A unique snake_case identifier for this rule. - Must be a valid Python identifier. Used to name the step - directory (WORK_DIR/steps/NN_rulename/). Examples: - 'bwa_mem_A', 'samtools_sort_sample_B'. - - wrapper_path (str): Catalog path of the wrapper exactly as - returned by search_wrappers. Examples: 'bio/bwa/mem', - 'bio/samtools/sort', 'meta/bio/bwa_mapping'. - - input (dict): Mapping of input argument names to file paths. - Key names MUST match those shown in the wrapper's example - rule (from get_wrapper_details). Path conventions: - - Staged input files: relative to WORK_DIR/input/. - Example: {"reads": "samples/A.fastq", "ref": "genome.fa"} - - Prior step outputs: prefix with 'steps/NN_rulename/'. - Example: {"bam": "steps/01_bwa_mem_A/A.bam"} - Never use absolute paths or environment variable names. - - output (dict): Mapping of output argument names to file paths. - Key names MUST match those shown in the wrapper's example - rule. Paths are relative to this step's directory - (WORK_DIR/steps/NN_rulename/). - Example: {"bam": "A.bam"} writes to - WORK_DIR/steps/NN_rulename/A.bam. - Never use absolute paths or environment variable names. - - params (dict, optional): Mapping of parameter names to values - as shown in the wrapper's example rule. Values can be - strings, integers, or booleans. - Example: {"sorting": "samtools", "sort_order": "coordinate"}. - - threads (int, optional): Number of threads to allocate. - Defaults to 1. - - log (str, optional): Pass any truthy value to enable logging. - Log written automatically to WORK_DIR/logs/{rule_name}.log. - - Returns a dictionary containing: - - success (bool): True if snakemake exited with returncode 0 - - returncode (int): snakemake process return code - - stdout (str): snakemake standard output - - stderr (str): snakemake standard error — inspect this carefully - on failure to understand what went wrong - - step_dir (str): path of this step's output directory relative - to WORK_DIR, e.g. 'steps/01_bwa_mem_A' - - work_dir_listing: current recursive listing of WORK_DIR/steps/ + {SHARED_EXECUTE_DOCSTRING} Always check 'success' before proceeding to the next step. Use rollback_step to undo this step if you want to retry with different arguments. """ - if not self.available: - return {"success": False, "error": "Snakemake provider is not available."} + # Strip the version prefix if the agent accidentally included it + wrapper_prefix = f"{SNAKEMAKE_WRAPPER_VERSION}/" + if wrapper_path.startswith(wrapper_prefix): + wrapper_path = wrapper_path[len(wrapper_prefix) :] + + # Also catch hardcoded master/ branch + # TODO can this be other branches? Can we count "/" instead? + if wrapper_path.startswith("master/"): + wrapper_path = wrapper_path[7:] entry = self._index.get(wrapper_path) if not entry: @@ -864,7 +863,7 @@ def execute_wrapper( lines.append(f' {directive}: "{SNAKEMAKE_WRAPPER_VERSION}/{wrapper_path}"') rule_text = "\n".join(lines) - return self._execute(rule_name, rule_text, resolved_output, step_dir) + return self._execute(rule_name, rule_text, resolved_output, step_dir, cores=cores) @workflow_tool def execute_rule( @@ -874,53 +873,26 @@ def execute_rule( output: Dict[str, Any], params: Optional[Dict[str, Any]] = None, threads: int = 1, + cores: int = 1, log: Optional[str] = None, shell: Optional[str] = None, run: Optional[str] = None, script: Optional[str] = None, ) -> Dict[str, Any]: - """ + f""" Executes a single custom workflow step written by the agent. Use this for glue steps that have no suitable catalog wrapper. Exactly one of shell, run, or script must be provided. Outputs go to WORK_DIR/steps/NN_rulename/. - Args: - rule_name (str): A unique snake_case identifier for this rule. - Must be a valid Python identifier. Used to name the step - directory (WORK_DIR/steps/NN_rulename/). Examples: - 'convert_sam_to_bam', 'rename_output_A'. - - input (dict): Mapping of input argument names to file paths. - Path conventions: - - Staged input files: relative to WORK_DIR/input/. - Example: {"reads": "samples/A.fastq"} - - Prior step outputs: prefix with 'steps/NN_rulename/'. - Example: {"bam": "steps/01_bwa_mem_A/A.bam"} - Never use absolute paths or environment variable names. - - output (dict): Mapping of output argument names to file paths. - Paths are relative to this step's directory - (WORK_DIR/steps/NN_rulename/). - Example: {"sorted": "A.sorted.bam"} writes to - WORK_DIR/steps/NN_rulename/A.sorted.bam. - Never use absolute paths or environment variable names. - - params (dict, optional): Mapping of parameter names to values. - Accessible in shell commands as {params.key_name}. - Example: {"min_mapq": 20}. - - threads (int, optional): Number of threads to allocate. - Defaults to 1. - - log (str, optional): Pass any truthy value to enable logging. - Log written automatically to WORK_DIR/logs/{rule_name}.log. - shell (str, optional): A shell command string using Snakemake - placeholder syntax to reference inputs, outputs, and params. - Example: "bwa mem {input.ref} {input.reads} | samtools view - -Sb - > {output.bam}". - Exactly one of shell, run, or script must be provided. + placeholder syntax. IMPORTANT: Only binaries available in + the *system* PATH are accessible here. Bioinformatics tools + like bwa, samtools, bcftools, and picard are managed by + wrapper-specific conda environments and are NOT available + in shell rules. If you need those tools, use execute_wrapper + instead. Shell rules are only appropriate for standard + system utilities (cp, mv, ln, awk, python3, etc.). run (str, optional): A block of Python code executed directly. Has access to the snakemake object for inputs/outputs/params. @@ -933,15 +905,7 @@ def execute_rule( automatically. Exactly one of shell, run, or script must be provided. - Returns a dictionary containing: - - success (bool): True if snakemake exited with returncode 0 - - returncode (int): snakemake process return code - - stdout (str): snakemake standard output - - stderr (str): snakemake standard error — inspect this carefully - on failure to understand what went wrong - - step_dir (str): path of this step's output directory relative - to WORK_DIR, e.g. 'steps/02_samtools_sort_A' - - work_dir_listing: current recursive listing of WORK_DIR/steps/ + {SHARED_EXECUTE_DOCSTRING} Always check 'success' before proceeding to the next step. Use rollback_step to undo this step if you want to retry with @@ -972,7 +936,7 @@ def execute_rule( lines.append(f" {line}") rule_text = "\n".join(lines) - return self._execute(rule_name, rule_text, resolved_output, step_dir) + return self._execute(rule_name, rule_text, resolved_output, step_dir, cores=cores) @workflow_tool def view_snakefile(self) -> Dict[str, Any]: @@ -1020,9 +984,6 @@ def rollback_step(self) -> Dict[str, Any]: - steps_remaining (int): number of steps still in history - history: updated ordered list of remaining steps """ - if not self.available: - return {"success": False, "error": "Snakemake provider is not available."} - if not self._history: return {"success": False, "error": "No steps to roll back."} From 309577e1fd3882005e75b8b689bf455ec3f1815b Mon Sep 17 00:00:00 2001 From: vsoch Date: Sun, 17 May 2026 12:05:08 -0700 Subject: [PATCH 4/6] snakemake: clean up code and add samples generation Signed-off-by: vsoch --- .../providers/workflow/snakemake.py | 752 ++++++++---------- 1 file changed, 340 insertions(+), 412 deletions(-) diff --git a/resource_secretary/providers/workflow/snakemake.py b/resource_secretary/providers/workflow/snakemake.py index e3391ac..588102c 100644 --- a/resource_secretary/providers/workflow/snakemake.py +++ b/resource_secretary/providers/workflow/snakemake.py @@ -1,3 +1,4 @@ +import csv import os import shutil import subprocess @@ -11,53 +12,23 @@ from ..provider import BaseProvider, workflow_tool -# Module-level environment configuration, read at import time +# Module-level environment configuration SNAKEMAKE_WORK_DIR = os.environ.get("RESOURCE_SECRETARY_SNAKEMAKE_WORKDIR") SNAKEMAKE_INPUT_DIR = os.environ.get("RESOURCE_SECRETARY_SNAKEMAKE_INPUT") SNAKEMAKE_WRAPPER_VERSION = os.environ.get("RESOURCE_SECRETARY_SNAKEMAKE_WRAPPER_VERSION", "master") SNAKEMAKE_WRAPPER_CLONE = os.environ.get("RESOURCE_SECRETARY_SNAKEMAKE_WRAPPER_CLONE") - -print(f" SNAKEMAKE_WORK_DIR: {SNAKEMAKE_WORK_DIR}") -print(f" SNAKEMAKE_INPUT_DIR: {SNAKEMAKE_INPUT_DIR}") -print(f" SNAKEMAKE_WRAPPER_VERSION: {SNAKEMAKE_WRAPPER_VERSION}") - SHARED_EXECUTE_DOCSTRING = """ Args: rule_name (str): A unique snake_case identifier for this rule. - Used to name the step directory (WORK_DIR/steps/NN_rulename/). - Examples: 'bwa_mem_A', 'samtools_sort_sample_B'. - input (dict): Mapping of input argument names to file paths. - Path conventions: - - Staged input files: relative to WORK_DIR/input/. - Example: {"reads": ["samples/A_1.fastq", "samples/A_2.fastq"]} - - Prior step outputs: prefix with 'steps/NN_rulename/'. - Example: {"bam": "steps/01_bwa_mem_A/A.bam"} - Never use absolute paths or environment variable names. - *BIO TIP*: If a tool needs multiple index files, pass them as a list! - Example: {"idx": ["genome.fa.amb", "genome.fa.ann", "genome.fa.bwt"]} - + - Staged files: relative to WORK_DIR/input/ (e.g. "samples/A_1.fastq") + - Prior step outputs: prefix with 'steps/NN_rulename/' output (dict): Mapping of output argument names to file paths. - Paths are relative to this step's directory (WORK_DIR/steps/NN_rulename/). - Example: {"bam": "A.bam"} writes to WORK_DIR/steps/NN_rulename/A.bam. - - params (dict, optional): Mapping of parameter names to values. - *BIO TIP*: For parameters requiring tabs (like BWA Read Groups), - escape them carefully using double backslashes or spaces. - Example: {"extra": r"-R '@RG\\tID:A\\tSM:A'"} - - threads (int, optional): Number of threads to allocate. Defaults to 1. - cores (int, optional): Number of cores to allocate. Defaults to 1. - log (str, optional): Pass any truthy value to enable automatic logging. - - Returns a dictionary containing: - - success (bool): True if snakemake exited with returncode 0 - - returncode (int): snakemake process return code - - stdout/stderr (str): Process output. Inspect carefully on failure. - - step_dir (str): path of this step's output directory. - - work_dir_listing: current recursive listing of WORK_DIR/steps/ - - Always check 'success' before proceeding to the next step. + - Relative to this step's directory. + params (dict, optional): Tool parameters. + threads (int, optional): Threads to allocate. + cores (int, optional): Cores to allocate. + log (str, optional): Pass truthy value to enable automatic logging. """ @@ -71,28 +42,22 @@ def _validate_path(path: str, mode: str = "read") -> tuple[Optional[Path], Optio if not SNAKEMAKE_WORK_DIR: return None, "RESOURCE_SECRETARY_SNAKEMAKE_WORKDIR is not set." - try: - p = Path(path).resolve() - work_p = Path(SNAKEMAKE_WORK_DIR).resolve() - - if mode == "write": - steps_p = work_p / "steps" - if not p.is_relative_to(steps_p): - return None, ( - f"Security Error: Write access denied. " - f"Path '{path}' is outside WORK_DIR/steps/." - ) - return p, None + p = Path(path).resolve() + work_p = Path(SNAKEMAKE_WORK_DIR).resolve() - if mode == "read": - if p.is_relative_to(work_p): - return p, None - return None, ( - f"Security Error: Read access denied. " f"Path '{path}' is outside WORK_DIR." + if mode == "write": + steps_p = work_p / "steps" + if not p.is_relative_to(steps_p): + return ( + None, + f"Security Error: Write access denied. Path '{path}' is outside WORK_DIR/steps/.", ) + return p, None - except Exception as e: - return None, f"Path resolution error: {e}" + if mode == "read": + if p.is_relative_to(work_p): + return p, None + return None, f"Security Error: Read access denied. " f"Path '{path}' is outside WORK_DIR." return None, "Unknown path error." @@ -133,14 +98,39 @@ def __init__(self, repo_url: str = "https://github.com/snakemake/snakemake-wrapp self._index: Dict[str, Dict[str, Any]] = {} self._conda_frontend: Optional[str] = None - # Execution state - AFTER a step is run. - self._history = [] # Each entry: {rule_name, step_dir (relative to WORK_DIR), snakefile_bytes} + self._rules: List[Dict[str, str]] = [] + self._header: str = "" + self._parallel_mode = False @property def name(self) -> str: return "snakemake" + @property + def history(self) -> List[Dict[str, Any]]: + """ + Generates the history view dynamically from the rules list. + """ + return [ + {"step": i + 1, "rule_name": r["name"], "step_dir": r["step_dir"]} + for i, r in enumerate(self._rules) + ] + + @property + def _steps_dir(self) -> Path: + """ + The base directory for all step outputs. + """ + return Path(SNAKEMAKE_WORK_DIR) / "steps" + + @property + def snakefile_path(self) -> Path: + """ + The filesystem location of the Snakefile. + """ + return Path(SNAKEMAKE_WORK_DIR) / "Snakefile" + @property def metadata(self) -> Dict[str, Any]: return { @@ -151,63 +141,47 @@ def metadata(self) -> Dict[str, Any]: "work_dir": SNAKEMAKE_WORK_DIR, "input_dir": SNAKEMAKE_INPUT_DIR, "available": self.available, - "steps_completed": len(self._history), } - def get_required_software(self, name, required=True): - """ - Use shutil to find required (or fail) - """ - path = shutil.which(name) - if not path and required: - raise ValueError(f" SnakemakeProvider: {name} not found in PATH.") - return path - def probe(self) -> bool: """ Ensures snakemake and git are installed, clones the wrappers repository to a session-scoped temp directory, builds the wrapper index, and stages input data into WORK_DIR/input/ via symlinks. """ - self._snakemake = self.get_required_software("snakemake") - self._git = self.get_required_software("git") - self._conda = self.get_required_software( - "mamba", required=False - ) or self.get_required_software("conda") - self._conda_frontend = "mamba" if "mamba" in self._conda else "conda" - - if not SNAKEMAKE_WORK_DIR: - raise ValueError( - " SnakemakeProvider: RESOURCE_SECRETARY_SNAKEMAKE_WORKDIR is not set." - ) + self._snakemake = shutil.which("snakemake") + self._git = shutil.which("git") + conda_bin = shutil.which("mamba") or shutil.which("conda") + if not (self._snakemake and self._git and conda_bin): + raise ValueError("Required software (snakemake, git, conda/mamba) not found.") self._check_wrapper_utils() + self._conda_frontend = "mamba" if "mamba" in conda_bin else "conda" + if not SNAKEMAKE_WORK_DIR: + raise ValueError("RESOURCE_SECRETARY_SNAKEMAKE_WORKDIR is not set.") - # Ensure we have cloned wrappers - if SNAKEMAKE_WRAPPER_CLONE is not None and os.path.exists(SNAKEMAKE_WRAPPER_CLONE): - self.repo_url = SNAKEMAKE_WRAPPER_CLONE - else: - self.repo_path = self.clone_snakemake() - + self.repo_path = self._setup_wrappers() self._build_index() self._setup_work_dir() self.available = True - return self.available + return True - def clone_snakemake(self): + def _setup_wrappers(self): """ - Clone snakemake to a temporary directory or update already cloned + Clone the snakemake wrappers """ - tmp_dir = Path(tempfile.gettempdir()) / "snakemake_wrappers_catalog" - if not tmp_dir.exists(): - command = [self._git, "clone", "--depth", "1", self.repo_url, str(tmp_dir)] + dest = Path(tempfile.gettempdir()) / "snakemake_wrappers_catalog" + if SNAKEMAKE_WRAPPER_CLONE and os.path.exists(SNAKEMAKE_WRAPPER_CLONE): + return Path(SNAKEMAKE_WRAPPER_CLONE) + + if not dest.exists(): + cmd = [self._git, "clone", "--depth", "1", self.repo_url, str(dest)] else: - command = [self._git, "-C", str(tmp_dir), "pull"] - print(f" SnakemakeProvider: cloning or updating wrappers at {tmp_dir} ...") - subprocess.run(command, check=True, capture_output=True) - return tmp_dir + cmd = [self._git, "-C", str(dest), "pull"] + subprocess.run(cmd, check=True, capture_output=True) + return dest - def _check_wrapper_utils(self) -> bool: + def _check_wrapper_utils(self): """ Ensures snakemake-wrapper-utils is installed, installing it if not. Returns True if available after check, False if install failed. @@ -225,27 +199,21 @@ def _setup_work_dir(self): contents into WORK_DIR/input/, preserving directory structure. """ work_p = Path(SNAKEMAKE_WORK_DIR) - input_stage = work_p / "input" - steps_dir = work_p / "steps" - logs_dir = work_p / "logs" + for d in ["input", "steps", "logs"]: + (work_p / d).mkdir(parents=True, exist_ok=True) - for d in (work_p, input_stage, steps_dir, logs_dir): - d.mkdir(parents=True, exist_ok=True) - - # Symlink INPUT_DIR contents into input/ preserving structure + # Create symlinks to actual data, allowing agent to muck with if SNAKEMAKE_INPUT_DIR: - input_src = Path(SNAKEMAKE_INPUT_DIR) - for src in sorted(input_src.rglob("*")): - rel = src.relative_to(input_src) - dest = input_stage / rel + src_root = Path(SNAKEMAKE_INPUT_DIR) + for src in src_root.rglob("*"): + rel = src.relative_to(src_root) + dest = work_p / "input" / rel if src.is_dir(): dest.mkdir(parents=True, exist_ok=True) elif src.is_file() and not dest.exists(): dest.parent.mkdir(parents=True, exist_ok=True) dest.symlink_to(src.resolve()) - print(f" SnakemakeProvider: work dir staged at {work_p}") - def _build_index(self): """ Walks the cloned wrappers repo and indexes every wrapper found via @@ -257,41 +225,46 @@ def _build_index(self): for meta_path in self.repo_path.rglob("meta.yaml"): wrapper_dir = meta_path.parent wrapper_path = str(wrapper_dir.relative_to(self.repo_path)) + + # Metadata try: meta = utils.read_yaml(meta_path) or {} except Exception: meta = {} - print(f"Issue reading {meta_path}") + # Conda environments + conda_packages = [] env_path = wrapper_dir / "environment.yaml" - try: - env = utils.read_yaml(str(env_path)) or {} - conda_packages = env.get("dependencies", []) - except Exception: - print(f"Issue reading {env_path}") - conda_packages = [] + if env_path.exists(): + try: + env = utils.read_yaml(env_path) or {} + conda_packages = env.get("dependencies", []) + except Exception: + pass - # No print - unlikely to exist + # README and example + readme = "" for readme_name in ("README.md", "readme.md"): readme_path = wrapper_dir / readme_name + if readme_path.exists(): + try: + readme = readme_path.read_text() + break + except Exception: + pass + + example_rule = "" + test_snakefile = wrapper_dir / "test" / "Snakefile" + if test_snakefile.exists(): try: - readme = readme_path.read_text() + example_rule = test_snakefile.read_text() except Exception: - readme = "" + pass - test_snakefile = wrapper_dir / "test" / "Snakefile" - try: - example_rule = test_snakefile.read_text() - except Exception: - print(f"Issue reading {test_snakefile}") - example_rule = "" - - category = wrapper_path.split("/")[0] wrapper_type = "meta_wrapper" if wrapper_path.startswith("meta/") else "wrapper" - self._index[wrapper_path] = { "path": wrapper_path, - "category": category, + "category": wrapper_path.split("/")[0], "type": wrapper_type, "name": meta.get("name", wrapper_path), "description": meta.get("description", ""), @@ -300,48 +273,32 @@ def _build_index(self): "readme": readme, "example_rule": example_rule, } - print(f" SnakemakeProvider: indexed {len(self._index)} wrappers.") - @property - def _snakefile_path(self) -> Path: - return Path(SNAKEMAKE_WORK_DIR) / "Snakefile" - - @property - def _steps_dir(self) -> Path: - return Path(SNAKEMAKE_WORK_DIR) / "steps" - - def _next_step_dir(self, rule_name: str) -> Path: + def _rebuild_snakefile(self): """ - Returns the next numbered step directory path (not yet created). + Assembles the Snakefile from its three components. """ - existing_numbers = [0] - for h in self._history: - try: - # Extract '01' from 'steps/01_rule_name' - basename = Path(h["step_dir"]).name - existing_numbers.append(int(basename.split("_")[0])) - except ValueError: - pass + parts = [] + if self._header: + parts.append(self._header.strip()) - next_num = max(existing_numbers) + 1 - return self._steps_dir / f"{next_num:02d}_{rule_name}" + # Target the last rule in the list + if self._rules: + last_rule = self._rules[-1] + target_lines = ["rule all:", " input:"] - def _append_rule(self, rule_text: str): - """ - Appends a rule block to the accumulating Snakefile. - """ - with open(self._snakefile_path, "a") as f: - f.write("\n\n") - f.write(rule_text) + for path in last_rule["resolved_outputs"]: + if self._parallel_mode: + target_lines.append(f" expand('{path}', sample=SAMPLES),") + else: + target_lines.append(f" '{path}',") + parts.append("\n".join(target_lines)) - def _snakefile_size(self) -> int: - """ - Returns current Snakefile byte length, 0 if not yet created. - """ - if self._snakefile_path.exists(): - return self._snakefile_path.stat().st_size - return 0 + for r in self._rules: + parts.append(r["text"].strip()) + + self.snakefile_path.write_text("\n\n".join(parts)) def _run_snakemake(self, targets: List[str], cores: int = 1) -> subprocess.CompletedProcess: """ @@ -356,7 +313,7 @@ def _run_snakemake(self, targets: List[str], cores: int = 1) -> subprocess.Compl "--cores", str(cores), "--snakefile", - str(self._snakefile_path), + str(self.snakefile_path), ] + targets, capture_output=True, @@ -364,150 +321,34 @@ def _run_snakemake(self, targets: List[str], cores: int = 1) -> subprocess.Compl cwd=SNAKEMAKE_WORK_DIR, ) - def _resolve_input_paths(self, input: Dict[str, Any]) -> Dict[str, str]: + def _resolve_input_paths(self, input_data: Dict[str, Any]) -> Dict[str, str]: """ Resolves input paths relative to WORK_DIR. Paths starting with 'steps/' are resolved relative to WORK_DIR. All other paths are assumed relative to WORK_DIR/input/. Supports list values for inputs that expect multiple files (e.g. index sets). """ - resolved = {} work_p = Path(SNAKEMAKE_WORK_DIR) - for k, v in input.items(): - if isinstance(v, list): - resolved_list = [] - for item in v: - item = str(item) - if item.startswith("steps/"): - resolved_list.append(str(work_p / item)) - else: - resolved_list.append(str(work_p / "input" / item)) - resolved[k] = resolved_list - else: - v = str(v) - if v.startswith("steps/"): - resolved[k] = str(work_p / v) - else: - resolved[k] = str(work_p / "input" / v) - return resolved - - def _rebuild_from_content(self, rules_to_keep: List[tuple[str, str]]): - """ - Helper to rewrite the Snakefile and update internal history based - on a list of (rule_name, rule_text) blocks. - """ - # Clear existing file - self._snakefile_path.write_text("") - - # We need to rebuild history. - # Note: This logic assumes the step directories already exist on disk. - new_history = [] - - for name, text in rules_to_keep: - pre_size = self._snakefile_size() - old_entry = next((h for h in self._history if h["rule_name"] == name), None) - if old_entry is None: - continue - with open(self._snakefile_path, "a") as f: - f.write("\n\n" + text.strip()) - new_history.append( - { - "rule_name": name, - "step_dir": old_entry["step_dir"], - "snakefile_bytes": pre_size, - } - ) - self._history = new_history - - @workflow_tool - def delete_rule(self, rule_name: str) -> Dict[str, Any]: - """ - Removes a specific rule from the Snakefile by name. - - Args: - rule_name (str): The name of the rule to delete. - Returns: - - success: True if the rule was found and removed. - """ - if not self._snakefile_path.exists(): - return {"success": False, "error": "Snakefile does not exist."} - - content = self._snakefile_path.read_text() - # Split content into blocks starting with "rule " - # Use a simple split and filter to keep rule logic together - raw_blocks = content.split("\n\n") - new_blocks = [] - found = False - - for block in raw_blocks: - if block.strip().startswith(f"rule {rule_name}:"): - found = True - continue - if block.strip(): - # Extract rule name for the rebuilder - name = block.strip().splitlines()[0].replace("rule ", "").replace(":", "").strip() - new_blocks.append((name, block)) - - if not found: - return {"success": False, "error": f"Rule '{rule_name}' not found."} - - # Handle data deletion - entry = next((h for h in self._history if h["rule_name"] == rule_name), None) - if entry: - step_p = Path(SNAKEMAKE_WORK_DIR) / entry["step_dir"] - if step_p.exists(): - shutil.rmtree(step_p) - - self._rebuild_from_content(new_blocks) - return {"success": True, "message": f"Rule '{rule_name}' removed."} - - @workflow_tool - def deduplicate_rules(self) -> Dict[str, Any]: - """ - Parses the Snakefile and finds rules with duplicate names. - It keeps only the LAST occurrence of any duplicate rule and - removes the others. This is useful for cleaning up failed retries. - - Returns: - - success: True if deduplication completed. - - removed_count: Number of duplicate rules removed. - """ - if not self._snakefile_path.exists(): - return {"success": True, "removed_count": 0} - - content = self._snakefile_path.read_text() - raw_blocks = content.split("\n\n") - # Map rule_name -> last_seen_content - unique_rules = {} - order = [] # keep track of the first time we saw a rule to preserve order - - count_before = 0 - for block in raw_blocks: - clean = block.strip() - if not clean.startswith("rule "): - continue - - count_before += 1 - name = clean.splitlines()[0].replace("rule ", "").replace(":", "").strip() - - if name not in unique_rules: - order.append(name) - unique_rules[name] = block + def resolve_item(item): + item = str(item) + if item.startswith("steps/"): + return str(work_p / item) + return str(work_p / "input" / item) + + if isinstance(input_data, dict): + resolved = {} + for k, v in input_data.items(): + if isinstance(v, list): + resolved[k] = [resolve_item(i) for i in v] + else: + resolved[k] = resolve_item(v) + return resolved - # Reconstruct blocks based on the order they first appeared - # but using the content of the last appearance. - final_blocks = [(name, unique_rules[name]) for name in order] + if isinstance(input_data, list): + return [resolve_item(i) for i in input_data] - removed_count = count_before - len(final_blocks) - self._rebuild_from_content(final_blocks) - - return { - "success": True, - "removed_count": removed_count, - "current_rule_count": len(final_blocks), - "message": f"Deduplication complete. Removed {removed_count} duplicate rules. View the snakefile to ensure required steps were not lost.", - } + return input_data def _resolve_output_paths(self, output: Dict[str, Any], step_dir: Path) -> Dict[str, str]: """ @@ -527,19 +368,23 @@ def _add_rule_lines(self, name, resolved, lines): shared logic for adding rule lines (input or output) """ lines.append(f" {name}:") - for k, v in resolved.items(): - if isinstance(v, list): - items = ", ".join(f'"{item}"' for item in v) - lines.append(f" {k}=[{items}],") - else: - lines.append(f' {k}="{v}",') + if isinstance(resolved, dict): + for k, v in resolved.items(): + if isinstance(v, list): + items = ", ".join(f'"{item}"' for item in v) + lines.append(f" {k}=[{items}],") + else: + lines.append(f' {k}="{v}",') + elif isinstance(resolved, list): + for item in resolved: + lines.append(f' "{item}",') return lines def _build_rule_lines( self, rule_name: str, - input: Dict[str, Any], - output: Dict[str, Any], + inputs: Any, + output: Any, params: Optional[Dict[str, Any]], threads: int, log: Optional[str], @@ -551,7 +396,7 @@ def _build_rule_lines( Supports list values for inputs and outputs that expect multiple files. """ # Inputs and outputs - resolved_input = self._resolve_input_paths(input) + resolved_input = self._resolve_input_paths(inputs) resolved_output = self._resolve_output_paths(output, step_dir) lines = [f"rule {rule_name}:"] lines = self._add_rule_lines("input", resolved_input, lines) @@ -581,49 +426,129 @@ def _execute( cores: int = 1, ) -> Dict[str, Any]: """ - Shared execution path for both execute_wrapper and execute_rule. + Structural execution path. Updates state, rebuilds Snakefile, and executes. """ - existing_rule_names = [h["rule_name"] for h in self._history] - if rule_name in existing_rule_names or step_dir.exists(): - return { - "success": False, - "error": f"Rule '{rule_name}' already exists. Pick a new name, or use delete_rule('{rule_name}') first.", - } - - # Validate all resolved output paths + # Path Validation + all_resolved = [] for v in resolved_output.values(): - _, err = _validate_path(v, mode="write") - if err: - return {"success": False, "error": err} + paths = v if isinstance(v, list) else [v] + for p in paths: + _, err = _validate_path(p, mode="write") + if err: + return {"success": False, "error": err} + all_resolved.append(p) + + # Update Internal State + new_rule = { + "name": rule_name, + "text": rule_text, + "step_dir": str(step_dir.relative_to(SNAKEMAKE_WORK_DIR)), + "resolved_outputs": all_resolved, + } + self._rules.append(new_rule) - pre_append_size = self._snakefile_size() + # Prep Disk and Run step_dir.mkdir(parents=True, exist_ok=True) - self._append_rule(rule_text) - self._history.append( - { - "rule_name": rule_name, - "step_dir": str(step_dir.relative_to(SNAKEMAKE_WORK_DIR)), - "snakefile_bytes": pre_append_size, - } - ) + self._rebuild_snakefile() + result = self._run_snakemake([], cores=cores) - targets = list(resolved_output.values()) - result = self._run_snakemake(targets, cores=cores) + # Failure recovery + if result.returncode != 0: + self.delete_rule(rule_name) - if result.stdout: - print(result.stdout) - if result.stderr: - print(result.stderr) return { "success": result.returncode == 0, "returncode": result.returncode, "stdout": result.stdout, "stderr": result.stderr, - "auto_rolled_back": rule_name in existing_rule_names or step_dir.exists(), - "step_dir": str(step_dir.relative_to(SNAKEMAKE_WORK_DIR)), + "step_dir": new_rule["step_dir"], "work_dir_listing": _dir_listing(Path(SNAKEMAKE_WORK_DIR) / "steps"), } + @workflow_tool + def create_sample_sheet(self, samples: List[Dict[str, str]]) -> Dict[str, Any]: + """ + Creates a samples.csv file from the provided csv data and automatically + injects the Python code into the Snakefile to load it. Call this for workflows with + multiple samples. After discovering data with list_input_dir. The first row MUST be + the header of the file. + + Args: + samples (list of dict): A list of dictionaries representing the rows. + Example: [{"sample": "P19506_1005", "r1": "P19506_1005/R1.fastq", "r2": "P19506_1005/R2.fastq"}] + Every dictionary must have the exact same keys, and one key MUST be 'sample' + Paths should be relative to the input directory. + + Returns: + Instructions on how to use the auto-generated variables (SAMPLES and samples_df) + in your subsequent execute_rule calls. + """ + keys = list(samples[0].keys()) + if "sample" not in keys: + return {"success": False, "error": "The keys must include 'sample'."} + + # Write the csv and add to Snakefile + out_path = Path(SNAKEMAKE_WORK_DIR) / "input" / "samples.csv" + try: + with open(out_path, "w", newline="") as f: + writer = csv.DictWriter(f, fieldnames=keys) + writer.writeheader() + writer.writerows(samples) + except Exception as e: + return {"success": False, "error": f"Failed to write CSV: {e}"} + + # This is currently the only thing we write here, so can just set + self._header = ( + "import pandas as pd\n" + "samples_df = pd.read_csv('input/samples.csv').set_index('sample', drop=False)\n" + "SAMPLES = samples_df['sample'].tolist()" + ) + self._parallel_mode = True + self._rebuild_snakefile() + return {"success": True, "message": "Sample sheet created and header updated."} + + @workflow_tool + def delete_rule(self, rule_name: Optional[str] = None) -> Dict[str, Any]: + """ + Removes a specific rule from the Snakefile by name. + If rule_name is None, it removes the most recent rule (Rollback). + Args: + rule_name (str): The name of the rule to delete. + Returns: + success: True if the rule was found and removed. + """ + if not self._rules: + return {"success": False, "error": "No rules exist to remove."} + + # 1. Identify the target rule + if rule_name: + target_idx = next( + (i for i, r in enumerate(self._rules) if r["name"] == rule_name), None + ) + if target_idx is None: + return {"success": False, "error": f"Rule '{rule_name}' not found in history."} + rule_to_remove = self._rules.pop(target_idx) + else: + rule_to_remove = self._rules.pop() + + # Filesystem Cleanup + step_dir_rel = rule_to_remove["step_dir"] + step_path = (Path(SNAKEMAKE_WORK_DIR) / step_dir_rel).resolve() + workdir_p = Path(SNAKEMAKE_WORK_DIR).resolve() + input_p = (workdir_p / "input").resolve() + + # Safety: Ensure we are inside work_dir/steps/ and NOT deleting the root or input + try: + if step_path.exists() and step_path.is_relative_to(workdir_p / "steps"): + if step_path != workdir_p and step_path != input_p: + shutil.rmtree(step_path) + except Exception as e: + print(f"Warning: Could not delete directory {step_path}: {e}") + + # Synchronize State + self._rebuild_snakefile() + return {"success": True, "rule_name": rule_to_remove["name"], "step_dir": step_dir_rel} + @workflow_tool def get_environment(self) -> Dict[str, Any]: """ @@ -668,11 +593,8 @@ def get_environment(self) -> Dict[str, Any]: ), }, "wrapper_version": SNAKEMAKE_WRAPPER_VERSION, - "steps_completed": len(self._history), - "history": [ - {"step": i + 1, "rule_name": h["rule_name"], "step_dir": h["step_dir"]} - for i, h in enumerate(self._history) - ], + "steps_completed": len(self.history), + "history": self.history, "available": self.available, } @@ -728,10 +650,7 @@ def list_work_dir(self) -> Dict[str, Any]: "success": True, "root": "steps/", "items": _dir_listing(steps_dir), - "history": [ - {"step": i + 1, "rule_name": h["rule_name"], "step_dir": h["step_dir"]} - for i, h in enumerate(self._history) - ], + "history": self.history, } @workflow_tool @@ -817,13 +736,29 @@ def get_wrapper_details(self, wrapper_path: str) -> Dict[str, Any]: return {"success": True, **entry} + def _next_step_dir(self, rule_name: str) -> Path: + """ + Returns the next numbered step directory path (not yet created). + """ + existing_numbers = [0] + for r in self._rules: + try: + # Extract '01' from 'steps/01_rule_name' + basename = Path(r["step_dir"]).name + existing_numbers.append(int(basename.split("_")[0])) + except (ValueError, IndexError): + pass + + next_num = max(existing_numbers) + 1 + return self._steps_dir / f"{next_num:02d}_{rule_name}" + @workflow_tool def execute_wrapper( self, rule_name: str, wrapper_path: str, - input: Dict[str, Any], - output: Dict[str, Any], + inputs: Any, + output: Any, params: Optional[Dict[str, Any]] = None, threads: int = 1, cores: int = 1, @@ -838,27 +773,31 @@ def execute_wrapper( {SHARED_EXECUTE_DOCSTRING} Always check 'success' before proceeding to the next step. - Use rollback_step to undo this step if you want to retry with - different arguments. """ - # Strip the version prefix if the agent accidentally included it - wrapper_prefix = f"{SNAKEMAKE_WRAPPER_VERSION}/" - if wrapper_path.startswith(wrapper_prefix): - wrapper_path = wrapper_path[len(wrapper_prefix) :] + if not self.available: + return {"success": False, "error": "Snakemake provider is not available."} - # Also catch hardcoded master/ branch - # TODO can this be other branches? Can we count "/" instead? - if wrapper_path.startswith("master/"): - wrapper_path = wrapper_path[7:] + # Strip the version prefix if the agent accidentally included it + for prefix in [f"{SNAKEMAKE_WRAPPER_VERSION}/", "master/"]: + if wrapper_path.startswith(prefix): + wrapper_path = wrapper_path[len(prefix) :] entry = self._index.get(wrapper_path) if not entry: return {"success": False, "error": f"Wrapper '{wrapper_path}' not found in index."} + # Check against source of truth (self._rules) + if any(r["name"] == rule_name for r in self._rules): + return { + "success": False, + "error": f"Rule '{rule_name}' already exists. Pick a new name, or use delete_rule('{rule_name}') first.", + } + step_dir = self._next_step_dir(rule_name) lines, resolved_output = self._build_rule_lines( - rule_name, input, output, params, threads, log, step_dir + rule_name, inputs, output, params, threads, log, step_dir ) + directive = "meta_wrapper" if entry["type"] == "meta_wrapper" else "wrapper" lines.append(f' {directive}: "{SNAKEMAKE_WRAPPER_VERSION}/{wrapper_path}"') rule_text = "\n".join(lines) @@ -869,8 +808,8 @@ def execute_wrapper( def execute_rule( self, rule_name: str, - input: Dict[str, Any], - output: Dict[str, Any], + inputs: Any, + output: Any, params: Optional[Dict[str, Any]] = None, threads: int = 1, cores: int = 1, @@ -878,6 +817,7 @@ def execute_rule( shell: Optional[str] = None, run: Optional[str] = None, script: Optional[str] = None, + conda_packages: Optional[List[str]] = None, ) -> Dict[str, Any]: f""" Executes a single custom workflow step written by the agent. Use this @@ -905,11 +845,11 @@ def execute_rule( automatically. Exactly one of shell, run, or script must be provided. + conda_packages (list, optional): list of conda package names. + {SHARED_EXECUTE_DOCSTRING} Always check 'success' before proceeding to the next step. - Use rollback_step to undo this step if you want to retry with - different arguments. """ if not self.available: return {"success": False, "error": "Snakemake provider is not available."} @@ -921,11 +861,22 @@ def execute_rule( "error": "Exactly one of 'shell', 'run', or 'script' must be provided.", } + # Check against source of truth (self._rules) + if any(r["name"] == rule_name for r in self._rules): + return { + "success": False, + "error": f"Rule '{rule_name}' already exists. Pick a new name, or use delete_rule('{rule_name}') first.", + } + step_dir = self._next_step_dir(rule_name) lines, resolved_output = self._build_rule_lines( - rule_name, input, output, params, threads, log, step_dir + rule_name, inputs, output, params, threads, log, step_dir ) + if conda_packages: + env_yaml_path = self.write_conda_environment(step_dir, conda_packages) + lines.append(f' conda: "{env_yaml_path.relative_to(SNAKEMAKE_WORK_DIR)}"') + if shell is not None: lines.append(f' shell: "{shell}"') elif script is not None: @@ -949,11 +900,11 @@ def view_snakefile(self) -> Dict[str, Any]: - content (str): The full text of the Snakefile - rule_count (int): Number of rules currently defined """ - if not self._snakefile_path.exists(): + if not self.snakefile_path.exists(): return {"success": True, "content": "", "rule_count": 0} try: - content = self._snakefile_path.read_text() + content = self.snakefile_path.read_text() # Basic count by looking for the "rule " keyword at the start of lines rule_count = len( [line for line in content.splitlines() if line.strip().startswith("rule ")] @@ -962,53 +913,30 @@ def view_snakefile(self) -> Dict[str, Any]: except Exception as e: return {"success": False, "error": str(e)} + def write_conda_environment(self, step_dir, packages): + """ + Write one or more conda packages to a step directory. + """ + step_dir.mkdir(parents=True, exist_ok=True) + path = step_dir / "env.yaml" + env_content = { + "channels": ["conda-forge", "bioconda", "nodefaults"], + "dependencies": packages, + } + utils.write_yaml(env_content, path) + return path + + def rule_exists(self, step_dir, rule_name): + """ + Shared function to determine if a rule exists. + """ + existing_rule_names = [h["rule_name"] for h in self.history] + return rule_name in existing_rule_names or step_dir.exists() + def rollback_step(self) -> Dict[str, Any]: """ Rolls back the most recently executed step. Removes the step's output directory from WORK_DIR/steps/, truncates the Snakefile to remove the appended rule, and pops the step from history. - - Takes no arguments. Always rolls back exactly one step (the most recent). - - By default, when a step fails, we roll back for you. You must ONLY call - this tool when you want an additional rollback. Use this when you want to - redo a step that was not rolled back. You MUST inspect the Snakefile to - confirm first. - - Returns a dictionary containing: - - success (bool): True if rollback completed successfully - - rolled_back (str): rule_name of the step that was removed - - step_dir_removed (str): the step directory that was deleted, - relative to WORK_DIR - - snakefile_truncated_to_bytes (int): Snakefile size after truncation - - steps_remaining (int): number of steps still in history - - history: updated ordered list of remaining steps - """ - if not self._history: - return {"success": False, "error": "No steps to roll back."} - - entry = self._history.pop() - rule_name = entry["rule_name"] - step_dir = Path(SNAKEMAKE_WORK_DIR) / entry["step_dir"] - pre_size = entry["snakefile_bytes"] - - # Remove step output directory - if step_dir.exists(): - shutil.rmtree(step_dir) - - # Truncate Snakefile back to pre-append size - if self._snakefile_path.exists(): - with open(self._snakefile_path, "r+") as f: - f.truncate(pre_size) - - return { - "success": True, - "rolled_back": rule_name, - "step_dir_removed": str(entry["step_dir"]), - "snakefile_truncated_to_bytes": pre_size, - "steps_remaining": len(self._history), - "history": [ - {"step": i + 1, "rule_name": h["rule_name"], "step_dir": h["step_dir"]} - for i, h in enumerate(self._history) - ], - } + """ + return self.delete_rule(None) From 83de528482016d366b7d5c317ac21643f91a6526 Mon Sep 17 00:00:00 2001 From: vsoch Date: Thu, 28 May 2026 09:58:14 -0700 Subject: [PATCH 5/6] look at changes Signed-off-by: vsoch --- .../providers/workflow/snakemake.py | 48 +++++++------------ 1 file changed, 16 insertions(+), 32 deletions(-) diff --git a/resource_secretary/providers/workflow/snakemake.py b/resource_secretary/providers/workflow/snakemake.py index 588102c..f1ffcff 100644 --- a/resource_secretary/providers/workflow/snakemake.py +++ b/resource_secretary/providers/workflow/snakemake.py @@ -321,47 +321,31 @@ def _run_snakemake(self, targets: List[str], cores: int = 1) -> subprocess.Compl cwd=SNAKEMAKE_WORK_DIR, ) - def _resolve_input_paths(self, input_data: Dict[str, Any]) -> Dict[str, str]: + def _apply_path_resolution(self, data: Any, resolver_func) -> Any: """ - Resolves input paths relative to WORK_DIR. - Paths starting with 'steps/' are resolved relative to WORK_DIR. - All other paths are assumed relative to WORK_DIR/input/. - Supports list values for inputs that expect multiple files (e.g. index sets). + Recursively traverses dicts/lists to apply a resolution function to path strings. """ - work_p = Path(SNAKEMAKE_WORK_DIR) + if isinstance(data, dict): + return {k: self._apply_path_resolution(v, resolver_func) for k, v in data.items()} + if isinstance(data, list): + return [self._apply_path_resolution(i, resolver_func) for i in data] + return resolver_func(str(data)) - def resolve_item(item): - item = str(item) + def _resolve_input_paths(self, input_data: Dict[str, Any]) -> Dict[str, str]: + """ + Resolve inputs relative to snakemake working directory + """ + work_p = Path(SNAKEMAKE_WORK_DIR) + def input_resolver(item: str): if item.startswith("steps/"): return str(work_p / item) return str(work_p / "input" / item) - if isinstance(input_data, dict): - resolved = {} - for k, v in input_data.items(): - if isinstance(v, list): - resolved[k] = [resolve_item(i) for i in v] - else: - resolved[k] = resolve_item(v) - return resolved - - if isinstance(input_data, list): - return [resolve_item(i) for i in input_data] - - return input_data + return self._apply_path_resolution(input_data, input_resolver) def _resolve_output_paths(self, output: Dict[str, Any], step_dir: Path) -> Dict[str, str]: - """ - Resolves output paths relative to the step's output directory. - Supports list values for outputs that produce multiple files. - """ - resolved = {} - for k, v in output.items(): - if isinstance(v, list): - resolved[k] = [str(step_dir / str(item)) for item in v] - else: - resolved[k] = str(step_dir / str(v)) - return resolved + return self._apply_path_resolution(output, lambda item: str(step_dir / item)) + def _add_rule_lines(self, name, resolved, lines): """ From 2a56dbfdc8f3183e41d9ae5671e828a71cc1ae3b Mon Sep 17 00:00:00 2001 From: vsoch Date: Mon, 1 Jun 2026 18:17:25 -0700 Subject: [PATCH 6/6] restore snakemake without shared output/input Signed-off-by: vsoch --- .../providers/workflow/snakemake.py | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/resource_secretary/providers/workflow/snakemake.py b/resource_secretary/providers/workflow/snakemake.py index f1ffcff..588102c 100644 --- a/resource_secretary/providers/workflow/snakemake.py +++ b/resource_secretary/providers/workflow/snakemake.py @@ -321,31 +321,47 @@ def _run_snakemake(self, targets: List[str], cores: int = 1) -> subprocess.Compl cwd=SNAKEMAKE_WORK_DIR, ) - def _apply_path_resolution(self, data: Any, resolver_func) -> Any: - """ - Recursively traverses dicts/lists to apply a resolution function to path strings. - """ - if isinstance(data, dict): - return {k: self._apply_path_resolution(v, resolver_func) for k, v in data.items()} - if isinstance(data, list): - return [self._apply_path_resolution(i, resolver_func) for i in data] - return resolver_func(str(data)) - def _resolve_input_paths(self, input_data: Dict[str, Any]) -> Dict[str, str]: """ - Resolve inputs relative to snakemake working directory + Resolves input paths relative to WORK_DIR. + Paths starting with 'steps/' are resolved relative to WORK_DIR. + All other paths are assumed relative to WORK_DIR/input/. + Supports list values for inputs that expect multiple files (e.g. index sets). """ - work_p = Path(SNAKEMAKE_WORK_DIR) - def input_resolver(item: str): + work_p = Path(SNAKEMAKE_WORK_DIR) + + def resolve_item(item): + item = str(item) if item.startswith("steps/"): return str(work_p / item) return str(work_p / "input" / item) - return self._apply_path_resolution(input_data, input_resolver) + if isinstance(input_data, dict): + resolved = {} + for k, v in input_data.items(): + if isinstance(v, list): + resolved[k] = [resolve_item(i) for i in v] + else: + resolved[k] = resolve_item(v) + return resolved + + if isinstance(input_data, list): + return [resolve_item(i) for i in input_data] - def _resolve_output_paths(self, output: Dict[str, Any], step_dir: Path) -> Dict[str, str]: - return self._apply_path_resolution(output, lambda item: str(step_dir / item)) + return input_data + def _resolve_output_paths(self, output: Dict[str, Any], step_dir: Path) -> Dict[str, str]: + """ + Resolves output paths relative to the step's output directory. + Supports list values for outputs that produce multiple files. + """ + resolved = {} + for k, v in output.items(): + if isinstance(v, list): + resolved[k] = [str(step_dir / str(item)) for item in v] + else: + resolved[k] = str(step_dir / str(v)) + return resolved def _add_rule_lines(self, name, resolved, lines): """