12
12
import tempfile
13
13
import threading
14
14
from functools import cmp_to_key , partial
15
- from typing import (Any , Callable , Dict , Generator , List , Mapping , MutableMapping ,
16
- MutableSequence , Optional , Set , Union , cast )
15
+ from typing import (Any , Callable , Dict , Generator , IO , List , Mapping ,
16
+ MutableMapping , MutableSequence , Optional , Set , Union , cast )
17
17
18
18
from typing_extensions import Text , Type , TYPE_CHECKING # pylint: disable=unused-import
19
19
# move to a regular typing import when Python 3.3-3.6 is no longer supported
@@ -83,8 +83,8 @@ def __init__(self,
83
83
builder , # type: Builder
84
84
script , # type: Dict[Text, Text]
85
85
output_callback , # type: Callable[[Any, Any], Any]
86
- requirements , # type: Dict[Text, Text]
87
- hints , # type: Dict[Text, Text]
86
+ requirements , # type: List[ Dict[Text, Text] ]
87
+ hints , # type: List[ Dict[Text, Text] ]
88
88
outdir = None , # type: Optional[Text]
89
89
tmpdir = None , # type: Optional[Text]
90
90
): # type: (...) -> None
@@ -101,7 +101,7 @@ def __init__(self,
101
101
102
102
def run (self ,
103
103
runtimeContext , # type: RuntimeContext
104
- tmpdir_lock = None # type: threading.Lock
104
+ tmpdir_lock = None # type: Optional[ threading.Lock]
105
105
): # type: (...) -> None
106
106
try :
107
107
normalizeFilesDirs (self .builder .job )
@@ -194,8 +194,10 @@ def __init__(self, job, output_callback, cachebuilder, jobcache):
194
194
self .outdir = jobcache
195
195
self .prov_obj = None # type: Optional[ProvenanceProfile]
196
196
197
- def run (self , runtimeContext ):
198
- # type: (RuntimeContext) -> None
197
+ def run (self ,
198
+ runtimeContext , # type: RuntimeContext
199
+ tmpdir_lock = None # type: Optional[threading.Lock]
200
+ ): # type: (...) -> None
199
201
self .output_callback (self .job .collect_output_ports (
200
202
self .job .tool ["outputs" ],
201
203
self .cachebuilder ,
@@ -232,7 +234,7 @@ def check_adjust(builder, file_o):
232
234
file_o ["basename" ]))
233
235
return file_o
234
236
235
- def check_valid_locations (fs_access , ob ):
237
+ def check_valid_locations (fs_access , ob ): # type: (StdFsAccess, Dict[Text, Any]) -> None
236
238
if ob ["location" ].startswith ("_:" ):
237
239
pass
238
240
if ob ["class" ] == "File" and not fs_access .isfile (ob ["location" ]):
@@ -285,7 +287,7 @@ def make_path_mapper(self, reffiles, stagedir, runtimeContext, separateDirs):
285
287
return PathMapper (reffiles , runtimeContext .basedir , stagedir , separateDirs )
286
288
287
289
def updatePathmap (self , outdir , pathmap , fn ):
288
- # type: (Text, PathMapper, Dict) -> None
290
+ # type: (Text, PathMapper, Dict[Text, Any] ) -> None
289
291
if "location" in fn and fn ["location" ] in pathmap :
290
292
pathmap .update (fn ["location" ], pathmap .mapper (fn ["location" ]).resolved ,
291
293
os .path .join (outdir , fn ["basename" ]),
@@ -298,7 +300,7 @@ def updatePathmap(self, outdir, pathmap, fn):
298
300
def job (self ,
299
301
job_order , # type: Mapping[Text, Text]
300
302
output_callbacks , # type: Callable[[Any, Any], Any]
301
- runtimeContext # RuntimeContext
303
+ runtimeContext # type: RuntimeContext
302
304
):
303
305
# type: (...) -> Generator[Union[JobBase, CallbackJob], None, None]
304
306
@@ -332,9 +334,9 @@ def job(self,
332
334
if dockerimg is not None :
333
335
cmdline = ["docker" , "run" , dockerimg ] + cmdline
334
336
# not really run using docker, just for hashing purposes
335
- keydict = {u"cmdline" : cmdline }
337
+ keydict = {u"cmdline" : cmdline } # type: Dict[Text, Union[Dict[Text, Any], List[Any]]]
336
338
337
- for shortcut in ["stdout " , "stderr" ]: # later, add "stdin"
339
+ for shortcut in ["stdin " , "stdout" , "stderr" ]:
338
340
if shortcut in self .tool :
339
341
keydict [shortcut ] = self .tool [shortcut ]
340
342
@@ -409,8 +411,12 @@ def job(self,
409
411
runtimeContext = runtimeContext .copy ()
410
412
runtimeContext .outdir = jobcache
411
413
412
- def update_status_output_callback (output_callbacks , jobcachelock ,
413
- outputs , processStatus ):
414
+ def update_status_output_callback (
415
+ output_callbacks , # type: Callable[[List[Dict[Text, Any]], Text], None]
416
+ jobcachelock , # type: IO[Any]
417
+ outputs , # type: List[Dict[Text, Any]]
418
+ processStatus # type: Text
419
+ ): # type: (...) -> None
414
420
# save status to the lockfile then release the lock
415
421
jobcachelock .seek (0 )
416
422
jobcachelock .truncate ()
@@ -556,13 +562,15 @@ def update_status_output_callback(output_callbacks, jobcachelock,
556
562
muts = set () # type: Set[Text]
557
563
558
564
if builder .mutation_manager is not None :
559
- def register_mut (f ):
565
+ def register_mut (f ): # type: (Dict[Text, Any]) -> None
566
+ mm = cast (MutationManager , builder .mutation_manager )
560
567
muts .add (f ["location" ])
561
- builder . mutation_manager .register_mutation (j .name , f )
568
+ mm .register_mutation (j .name , f )
562
569
563
- def register_reader (f ):
570
+ def register_reader (f ): # type: (Dict[Text, Any]) -> None
571
+ mm = cast (MutationManager , builder .mutation_manager )
564
572
if f ["location" ] not in muts :
565
- builder . mutation_manager .register_reader (j .name , f )
573
+ mm .register_reader (j .name , f )
566
574
readers [f ["location" ]] = copy .deepcopy (f )
567
575
568
576
for li in j .generatefiles ["listing" ]:
@@ -628,7 +636,7 @@ def collect_output_ports(self,
628
636
rcode , # type: int
629
637
compute_checksum = True , # type: bool
630
638
jobname = "" , # type: Text
631
- readers = None # type: Dict[Text, Any]
639
+ readers = None # type: Optional[ Dict[Text, Any] ]
632
640
): # type: (...) -> OutputPorts
633
641
ret = {} # type: OutputPorts
634
642
debug = _logger .isEnabledFor (logging .DEBUG )
@@ -647,11 +655,12 @@ def collect_output_ports(self,
647
655
json_dumps (ret , indent = 4 ))
648
656
else :
649
657
for i , port in enumerate (ports ):
650
- def makeWorkflowException (msg ):
651
- return WorkflowException (
652
- u"Error collecting output for parameter '%s':\n %s"
653
- % (shortname (port ["id" ]), msg ))
654
- with SourceLine (ports , i , makeWorkflowException , debug ):
658
+ class ParameterOutputWorkflowException (WorkflowException ):
659
+ def __init__ (self , msg , ** kwargs ): # type: (Text, **Any) -> None
660
+ super (ParameterOutputWorkflowException , self ).__init__ (
661
+ u"Error collecting output for parameter '%s':\n %s"
662
+ % (shortname (port ["id" ]), msg ), kwargs )
663
+ with SourceLine (ports , i , ParameterOutputWorkflowException , debug ):
655
664
fragment = shortname (port ["id" ])
656
665
ret [fragment ] = self .collect_output (port , builder , outdir , fs_access ,
657
666
compute_checksum = compute_checksum )
0 commit comments