Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
with:
environments: dev
pixi-version: v0.42.1
cache: true

- name: Ruff Format
if: always()
Expand All @@ -35,10 +36,11 @@ jobs:
if: always()
run: |
pixi run --environment dev lint --diff
# - name: Mypy
# if: always()
# run: |
# pixi run --environment dev type-check

- name: Mypy
if: always()
run: |
pixi run --environment dev type-check

- name: Collect QC
run: echo "All quality control checks passed"
Expand All @@ -53,6 +55,7 @@ jobs:
with:
environments: dev
pixi-version: v0.42.1
cache: true

- name: Run tests
run: pixi run --environment dev test --show-capture=all -s -vv
7 changes: 6 additions & 1 deletion snakemake_interface_executor_plugins/executors/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,18 @@ class SubmittedJobInfo:


class AbstractExecutor(ABC):
workflow: WorkflowExecutorInterface
logger: LoggerExecutorInterface

def __init__(
self,
workflow: WorkflowExecutorInterface,
logger: LoggerExecutorInterface,
):
self.workflow = workflow
self.dag = workflow.dag
self.dag = (
workflow.dag
) # we cant type annotate this because dag type is in snakemake
self.logger = logger

def get_resource_declarations_dict(self, job: JobExecutorInterface):
Expand Down
12 changes: 9 additions & 3 deletions snakemake_interface_executor_plugins/executors/real.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
__license__ = "MIT"

from abc import abstractmethod
from typing import Dict
from typing import Mapping
from snakemake_interface_executor_plugins.executors.base import (
AbstractExecutor,
SubmittedJobInfo,
)
from snakemake_interface_executor_plugins.logging import LoggerExecutorInterface
from snakemake_interface_executor_plugins.settings import ExecMode
from snakemake_interface_executor_plugins.settings import ExecMode, ExecutorSettingsBase
from snakemake_interface_executor_plugins.utils import (
encode_target_jobs_cli_args,
format_cli_arg,
Expand All @@ -21,6 +21,12 @@


class RealExecutor(AbstractExecutor):
# Class attributes with type annotations
workflow: WorkflowExecutorInterface
logger: LoggerExecutorInterface
executor_settings: ExecutorSettingsBase
snakefile: str

def __init__(
self,
workflow: WorkflowExecutorInterface,
Expand Down Expand Up @@ -176,5 +182,5 @@ def format_job_exec(self, job: JobExecutorInterface) -> str:
)
return args

def envvars(self) -> Dict[str, str]:
def envvars(self) -> Mapping[str, str]:
return self.workflow.spawned_job_args_factory.envvars()
21 changes: 17 additions & 4 deletions snakemake_interface_executor_plugins/executors/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import sys
import tempfile
import threading
from typing import Generator, List
from typing import Generator, List, Optional
from snakemake_interface_common.exceptions import WorkflowError
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
from snakemake_interface_executor_plugins.executors.real import RealExecutor
Expand All @@ -36,7 +36,20 @@ class RemoteExecutor(RealExecutor, ABC):
also for the cloud.
"""

default_jobscript = "jobscript.sh"
# Class attributes with type annotations
default_jobscript: str = "jobscript.sh"
_next_seconds_between_status_checks: Optional[int]
max_status_checks_per_second: float
jobname: str
snakefile: str
is_default_jobscript: bool
jobscript: str
_tmpdir: Optional[str]
active_jobs: List[SubmittedJobInfo]
lock: threading.Lock
wait: bool
wait_thread: threading.Thread
status_rate_limiter: Throttler

def __init__(
self,
Expand Down Expand Up @@ -118,7 +131,7 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
...

def get_exec_mode(self) -> ExecMode:
return ExecMode.REMOTE
return ExecMode.REMOTE # type: ignore

def get_python_executable(self):
return (
Expand All @@ -128,7 +141,7 @@ def get_python_executable(self):
else "python"
)

def get_job_args(self, job: JobExecutorInterface):
def get_job_args(self, job: JobExecutorInterface, **kwargs):
waitfiles_parameter = ""
if SharedFSUsage.INPUT_OUTPUT in self.workflow.storage_settings.shared_fs_usage:
wait_for_files = []
Expand Down
5 changes: 3 additions & 2 deletions snakemake_interface_executor_plugins/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@

from abc import ABC, abstractmethod
import sys
from typing import Any, Iterable, Mapping, Optional, Sequence, Union
from typing import Any, Iterable, Mapping, Optional, Sequence, Union, List

from snakemake_interface_common.rules import RuleInterface
from snakemake_interface_executor_plugins.utils import TargetSpec


class JobExecutorInterface(ABC):
Expand Down Expand Up @@ -58,7 +59,7 @@ def output(self) -> Iterable[str]: ...
def register(self, external_jobid: Optional[str] = None) -> None: ...

@abstractmethod
def get_target_spec(self) -> str: ...
def get_target_spec(self) -> List[TargetSpec]: ...

@abstractmethod
def rules(self) -> Iterable[RuleInterface]: ...
Expand Down
6 changes: 6 additions & 0 deletions snakemake_interface_executor_plugins/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@
__license__ = "MIT"

from abc import ABC, abstractmethod
from typing import Callable
from snakemake_interface_executor_plugins.jobs import JobExecutorInterface


class JobSchedulerExecutorInterface(ABC):
submit_callback: Callable[[JobExecutorInterface], None]
finish_callback: Callable[[JobExecutorInterface], None]
error_callback: Callable[[JobExecutorInterface], None]

@abstractmethod
def executor_error_callback(self, exception: Exception) -> None: ...
9 changes: 9 additions & 0 deletions snakemake_interface_executor_plugins/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
GroupSettingsExecutorInterface,
RemoteExecutionSettingsExecutorInterface,
StorageSettingsExecutorInterface,
ExecutorSettingsBase,
)


Expand Down Expand Up @@ -53,6 +54,10 @@ def group_settings(self) -> GroupSettingsExecutorInterface: ...
@abstractmethod
def executor_plugin(self) -> Optional[Plugin]: ...

@property
@abstractmethod
def executor_settings(self) -> ExecutorSettingsBase: ...

@property
@abstractmethod
def resource_scopes(self): ...
Expand All @@ -72,3 +77,7 @@ def workdir_init(self): ...
@property
@abstractmethod
def scheduler(self) -> JobSchedulerExecutorInterface: ...

@property
@abstractmethod
def dag(self): ...
Loading