55
66from abc import abstractmethod
77import os
8- import sys
9- from typing import Optional
10- from snakemake_interface_executor_plugins import ExecutorSettingsBase
11- from snakemake_interface_executor_plugins .dag import DAGExecutorInterface
128from snakemake_interface_executor_plugins .executors .base import AbstractExecutor
139from snakemake_interface_executor_plugins .logging import LoggerExecutorInterface
14- from snakemake_interface_executor_plugins .persistence import StatsExecutorInterface
10+ from snakemake_interface_executor_plugins .settings import ExecMode
1511from snakemake_interface_executor_plugins .utils import (
1612 encode_target_jobs_cli_args ,
1713 format_cli_arg ,
1814 join_cli_args ,
19- lazy_property ,
2015)
2116from snakemake_interface_executor_plugins .jobs import ExecutorJobInterface
2217from snakemake_interface_executor_plugins .workflow import WorkflowExecutorInterface
@@ -26,29 +21,33 @@ class RealExecutor(AbstractExecutor):
2621 def __init__ (
2722 self ,
2823 workflow : WorkflowExecutorInterface ,
29- dag : DAGExecutorInterface ,
30- stats : StatsExecutorInterface ,
3124 logger : LoggerExecutorInterface ,
32- executor_settings : Optional [ExecutorSettingsBase ],
33- job_core_limit : Optional [int ] = None ,
25+ pass_default_remote_provider_args : bool = True ,
26+ pass_default_resources_args : bool = True ,
27+ pass_envvar_declarations_to_cmd : bool = True ,
3428 ):
3529 super ().__init__ (
3630 workflow ,
37- dag ,
31+ logger ,
3832 )
39- self .cores = job_core_limit if job_core_limit else "all"
40- self .executor_settings = executor_settings
41- self .assume_shared_fs = workflow .assume_shared_fs
42- self .stats = stats
43- self .logger = logger
33+ self .executor_settings = self .workflow .executor_settings
4434 self .snakefile = workflow .main_snakefile
35+ self .pass_default_remote_provider_args = pass_default_remote_provider_args
36+ self .pass_default_resources_args = pass_default_resources_args
37+ self .pass_envvar_declarations_to_cmd = pass_envvar_declarations_to_cmd
38+
39+ @property
40+ @abstractmethod
41+ def cores (self ):
42+ # return "all" in case of remote executors,
43+ # otherwise self.workflow.resource_settings.cores
44+ ...
4545
4646 def register_job (self , job : ExecutorJobInterface ):
4747 job .register ()
4848
4949 def _run (self , job : ExecutorJobInterface , callback = None , error_callback = None ):
5050 super ()._run (job )
51- self .stats .report_job_start (job )
5251
5352 try :
5453 self .register_job (job )
@@ -74,96 +73,11 @@ def handle_job_success(
7473 handle_log = handle_log ,
7574 handle_touch = handle_touch ,
7675 ignore_missing_output = ignore_missing_output ,
77- latency_wait = self .latency_wait ,
78- assume_shared_fs = self .assume_shared_fs ,
79- keep_metadata = self .workflow .keep_metadata ,
8076 )
81- self .stats .report_job_end (job )
8277
8378 def handle_job_error (self , job : ExecutorJobInterface , upload_remote = True ):
8479 job .postprocess (
8580 error = True ,
86- assume_shared_fs = self .assume_shared_fs ,
87- latency_wait = self .latency_wait ,
88- )
89-
90- def workflow_property_to_arg (
91- self , property , flag = None , quote = True , skip = False , invert = False , attr = None
92- ):
93- if skip :
94- return ""
95-
96- value = getattr (self .workflow , property )
97-
98- if value is not None and attr is not None :
99- value = getattr (value , attr )
100-
101- if flag is None :
102- flag = f"--{ property .replace ('_' , '-' )} "
103-
104- if invert and isinstance (value , bool ):
105- value = not value
106-
107- return format_cli_arg (flag , value , quote = quote )
108-
109- @lazy_property
110- def general_args (self ):
111- """Return a string to add to self.exec_job that includes additional
112- arguments from the command line. This is currently used in the
113- ClusterExecutor and CPUExecutor, as both were using the same
114- code. Both have base class of the RealExecutor.
115- """
116- w2a = self .workflow_property_to_arg
117-
118- return join_cli_args (
119- [
120- "--force" ,
121- "--keep-target-files" ,
122- "--keep-remote" ,
123- "--max-inventory-time 0" ,
124- "--nocolor" ,
125- "--notemp" ,
126- "--no-hooks" ,
127- "--nolock" ,
128- "--ignore-incomplete" ,
129- format_cli_arg ("--keep-incomplete" , self .keepincomplete ),
130- w2a ("rerun_triggers" ),
131- w2a ("cleanup_scripts" , flag = "--skip-script-cleanup" ),
132- w2a ("shadow_prefix" ),
133- w2a ("use_conda" ),
134- w2a ("conda_frontend" ),
135- w2a ("conda_prefix" ),
136- w2a ("conda_base_path" , skip = not self .assume_shared_fs ),
137- w2a ("use_singularity" ),
138- w2a ("singularity_prefix" ),
139- w2a ("singularity_args" ),
140- w2a ("execute_subworkflows" , flag = "--no-subworkflows" , invert = True ),
141- w2a ("max_threads" ),
142- w2a ("use_env_modules" , flag = "--use-envmodules" ),
143- w2a ("keep_metadata" , flag = "--drop-metadata" , invert = True ),
144- w2a ("wrapper_prefix" ),
145- w2a ("overwrite_threads" , flag = "--set-threads" ),
146- w2a ("overwrite_scatter" , flag = "--set-scatter" ),
147- w2a ("local_groupid" , skip = self .job_specific_local_groupid ),
148- w2a ("conda_not_block_search_path_envvars" ),
149- w2a ("overwrite_configfiles" , flag = "--configfiles" ),
150- w2a ("config_args" , flag = "--config" ),
151- w2a ("printshellcmds" ),
152- w2a ("latency_wait" ),
153- w2a ("scheduler_type" , flag = "--scheduler" ),
154- format_cli_arg (
155- "--scheduler-solver-path" ,
156- os .path .dirname (sys .executable ),
157- skip = not self .assume_shared_fs ,
158- ),
159- self .get_set_resources_args (),
160- self .get_default_remote_provider_args (),
161- self .get_default_resources_args (),
162- self .get_resource_scopes_args (),
163- self .get_workdir_arg (),
164- join_cli_args (self .additional_general_args ()),
165- format_cli_arg ("--mode" , self .get_exec_mode ()),
166- ]
16781 )
16882
16983 def additional_general_args (self ):
@@ -212,11 +126,17 @@ def get_python_executable(self):
212126 ...
213127
214128 @abstractmethod
215- def get_exec_mode (self ):
129+ def get_exec_mode (self ) -> ExecMode :
216130 ...
217131
218132 def get_envvar_declarations (self ):
219- return ""
133+ if self .pass_envvar_declarations_to_cmd :
134+ return " " .join (
135+ f"{ var } ={ repr (os .environ [var ])} "
136+ for var in self .workflow .remote_execution_settings .envvars
137+ )
138+ else :
139+ return ""
220140
221141 def get_job_exec_prefix (self , job : ExecutorJobInterface ):
222142 return ""
@@ -231,6 +151,10 @@ def format_job_exec(self, job: ExecutorJobInterface):
231151 suffix = self .get_job_exec_suffix (job )
232152 if suffix :
233153 suffix = f"&& { suffix } "
154+ general_args = self .workflow .spawned_job_args_factory .general_args (
155+ pass_default_remote_provider_args = self .pass_default_remote_provider_args ,
156+ pass_default_resources_args = self .pass_default_resources_args ,
157+ )
234158 return join_cli_args (
235159 [
236160 prefix ,
@@ -239,7 +163,16 @@ def format_job_exec(self, job: ExecutorJobInterface):
239163 "-m snakemake" ,
240164 format_cli_arg ("--snakefile" , self .get_snakefile ()),
241165 self .get_job_args (job ),
242- self .general_args ,
166+ self .get_default_remote_provider_args (),
167+ self .get_workdir_arg (),
168+ general_args ,
169+ self .additional_general_args (),
170+ format_cli_arg ("--mode" , self .get_exec_mode ()),
171+ format_cli_arg (
172+ "--local-groupid" ,
173+ self .workflow .group_settings .local_groupid ,
174+ skip = self .job_specific_local_groupid ,
175+ ),
243176 suffix ,
244177 ]
245178 )
0 commit comments