|
| 1 | +import nipype |
| 2 | +import attrs |
| 3 | +import typing as ty |
| 4 | +from pydra.compose import base |
| 5 | +from pydra.compose.base.builder import build_task_class |
| 6 | +from pydra.utils.general import task_fields, task_dict |
| 7 | +from fileformats.generic import File, Directory, FileSet |
| 8 | +import nipype.interfaces.base.traits_extension |
| 9 | +from pydra.engine.job import Job |
| 10 | +from pydra.utils.typing import is_fileset_or_union |
| 11 | + |
| 12 | + |
| 13 | +__all__ = ["define", "arg", "out", "Task", "Outputs"] |
| 14 | + |
| 15 | + |
| 16 | +class arg(base.Arg): |
| 17 | + """Argument of a Python task |
| 18 | +
|
| 19 | + Parameters |
| 20 | + ---------- |
| 21 | + help: str |
| 22 | + A short description of the input field. |
| 23 | + default : Any, optional |
| 24 | + the default value for the argument |
| 25 | + allowed_values: list, optional |
| 26 | + List of allowed values for the field. |
| 27 | + requires: list, optional |
| 28 | + Names of the inputs that are required together with the field. |
| 29 | + copy_mode: File.CopyMode, optional |
| 30 | + The mode of copying the file, by default it is File.CopyMode.any |
| 31 | + copy_collation: File.CopyCollation, optional |
| 32 | + The collation of the file, by default it is File.CopyCollation.any |
| 33 | + copy_ext_decomp: File.ExtensionDecomposition, optional |
| 34 | + The extension decomposition of the file, by default it is |
| 35 | + File.ExtensionDecomposition.single |
| 36 | + readonly: bool, optional |
| 37 | + If True the input field can’t be provided by the user but it aggregates other |
| 38 | + input fields (for example the fields with argstr: -o {fldA} {fldB}), by default |
| 39 | + it is False |
| 40 | + type: type, optional |
| 41 | + The type of the field, by default it is Any |
| 42 | + name: str, optional |
| 43 | + The name of the field, used when specifying a list of fields instead of a mapping |
| 44 | + from name to field, by default it is None |
| 45 | + """ |
| 46 | + |
| 47 | + |
| 48 | +class out(base.Out): |
| 49 | + """Output of a Python task |
| 50 | +
|
| 51 | + Parameters |
| 52 | + ---------- |
| 53 | + name: str, optional |
| 54 | + The name of the field, used when specifying a list of fields instead of a mapping |
| 55 | + from name to field, by default it is None |
| 56 | + type: type, optional |
| 57 | + The type of the field, by default it is Any |
| 58 | + help: str, optional |
| 59 | + A short description of the input field. |
| 60 | + requires: list, optional |
| 61 | + Names of the inputs that are required together with the field. |
| 62 | + converter: callable, optional |
| 63 | + The converter for the field passed through to the attrs.field, by default it is None |
| 64 | + validator: callable | iterable[callable], optional |
| 65 | + The validator(s) for the field passed through to the attrs.field, by default it is None |
| 66 | + position : int |
| 67 | + The position of the output in the output list, allows for tuple unpacking of |
| 68 | + outputs |
| 69 | + """ |
| 70 | + |
| 71 | + |
| 72 | +def define(interface: nipype.interfaces.base.BaseInterface) -> "Task": |
| 73 | + """ |
| 74 | + Create an interface for a function or a class. |
| 75 | +
|
| 76 | + Parameters |
| 77 | + ---------- |
| 78 | + wrapped : type | callable | None |
| 79 | + The function or class to create an interface for. |
| 80 | + inputs : list[str | Arg] | dict[str, Arg | type] | None |
| 81 | + The inputs to the function or class. |
| 82 | + outputs : list[str | base.Out] | dict[str, base.Out | type] | type | None |
| 83 | + The outputs of the function or class. |
| 84 | + auto_attribs : bool |
| 85 | + Whether to use auto_attribs mode when creating the class. |
| 86 | + xor: Sequence[str | None] | Sequence[Sequence[str | None]], optional |
| 87 | + Names of args that are exclusive mutually exclusive, which must include |
| 88 | + the name of the current field. If this list includes None, then none of the |
| 89 | + fields need to be set. |
| 90 | +
|
| 91 | + Returns |
| 92 | + ------- |
| 93 | + Task |
| 94 | + The task class for the Python function |
| 95 | + """ |
| 96 | + inputs = traitedspec_to_fields( |
| 97 | + interface.inputs, arg, skip_fields={"interface", "function_str"} |
| 98 | + ) |
| 99 | + outputs = traitedspec_to_fields(interface._outputs(), out) |
| 100 | + |
| 101 | + task_class = build_task_class( |
| 102 | + Nipype1Task, |
| 103 | + Nipype1Outputs, |
| 104 | + inputs, |
| 105 | + outputs, |
| 106 | + name=type(interface).__name__, |
| 107 | + klass=None, |
| 108 | + bases=(), |
| 109 | + outputs_bases=(), |
| 110 | + ) |
| 111 | + |
| 112 | + task_class._interface = interface |
| 113 | + |
| 114 | + return task_class |
| 115 | + |
| 116 | + |
| 117 | +class Nipype1Outputs(base.Outputs): |
| 118 | + |
| 119 | + @classmethod |
| 120 | + def _from_job(cls, job: "Job[Nipype1Outputs]") -> ty.Self: |
| 121 | + """Collect the outputs of a job from a combination of the provided inputs, |
| 122 | + the objects in the output directory, and the stdout and stderr of the process. |
| 123 | +
|
| 124 | + Parameters |
| 125 | + ---------- |
| 126 | + job : Job[Task] |
| 127 | + The job whose outputs are being collected. |
| 128 | + outputs_dict : dict[str, ty.Any] |
| 129 | + The outputs of the job, as a dictionary |
| 130 | +
|
| 131 | + Returns |
| 132 | + ------- |
| 133 | + outputs : Outputs |
| 134 | + The outputs of the job in dataclass |
| 135 | + """ |
| 136 | + outputs = super()._from_task(job) |
| 137 | + for name, val in job.return_values.items(): |
| 138 | + setattr(outputs, name, val) |
| 139 | + return outputs |
| 140 | + |
| 141 | + @classmethod |
| 142 | + def _from_task(cls, job: "Job[Nipype1Outputs]") -> ty.Self: |
| 143 | + # Added for backwards compatibility |
| 144 | + return cls._from_job(job) |
| 145 | + |
| 146 | + |
| 147 | +class Nipype1Task(base.Task): |
| 148 | + """Wrap a Nipype 1.x Interface as a Pydra Task |
| 149 | +
|
| 150 | + This utility translates the Nipype 1 input and output specs to |
| 151 | + Pydra-style specs, wraps the run command, and exposes the output |
| 152 | + in Pydra Task outputs. |
| 153 | +
|
| 154 | + >>> import pytest |
| 155 | + >>> from pydra.tasks.nipype1.tests import load_resource |
| 156 | + >>> from nipype.interfaces import fsl |
| 157 | + >>> if fsl.Info.version() is None: |
| 158 | + ... pytest.skip() |
| 159 | + >>> img = load_resource('nipype', 'testing/data/tpms_msk.nii.gz') |
| 160 | +
|
| 161 | + >>> from pydra.tasks.nipype1.utils import Nipype1Task |
| 162 | + >>> thresh = Nipype1Task(fsl.Threshold()) |
| 163 | + >>> thresh.inputs.in_file = img |
| 164 | + >>> thresh.inputs.thresh = 0.5 |
| 165 | + >>> res = thresh() |
| 166 | + >>> res.output.out_file # DOCTEST: +ELLIPSIS |
| 167 | + '.../tpms_msk_thresh.nii.gz' |
| 168 | + """ |
| 169 | + |
| 170 | + _task_type = "nipype1" |
| 171 | + |
| 172 | + def _run(self, job: "Job[Nipype1Task]", rerun: bool = False) -> None: |
| 173 | + fields = task_fields(self) |
| 174 | + inputs = { |
| 175 | + n: v if not isinstance(v, FileSet) else str(v) |
| 176 | + for n, v in task_dict(self).items() |
| 177 | + if v is not None or fields[n].mandatory |
| 178 | + } |
| 179 | + node = nipype.Node( |
| 180 | + self._interface, base_dir=job.cache_dir, name=type(self).__name__ |
| 181 | + ) |
| 182 | + node.inputs.trait_set(**inputs) |
| 183 | + res = node.run() |
| 184 | + job.return_values = res.outputs.get() |
| 185 | + |
| 186 | + |
| 187 | +FieldType = ty.TypeVar("FieldType", bound=arg | out) |
| 188 | + |
| 189 | + |
| 190 | +def traitedspec_to_fields( |
| 191 | + traitedspec, field_type: type[FieldType], skip_fields: set[str] = set() |
| 192 | +) -> dict[str, FieldType]: |
| 193 | + trait_names = set(traitedspec.copyable_trait_names()) |
| 194 | + fields = {} |
| 195 | + for name, trait in traitedspec.traits().items(): |
| 196 | + if name in skip_fields: |
| 197 | + continue |
| 198 | + type_ = TYPE_CONVERSIONS.get(type(trait.trait_type), ty.Any) |
| 199 | + if not trait.mandatory: |
| 200 | + type_ = type_ | None |
| 201 | + default = None |
| 202 | + else: |
| 203 | + default = base.NO_DEFAULT |
| 204 | + if name in trait_names: |
| 205 | + fields[name] = field_type( |
| 206 | + name=name, help=trait.desc, type=type_, default=default |
| 207 | + ) |
| 208 | + return fields |
| 209 | + |
| 210 | + |
| 211 | +Task = Nipype1Task |
| 212 | +Outputs = Nipype1Outputs |
| 213 | + |
| 214 | + |
| 215 | +TYPE_CONVERSIONS = { |
| 216 | + nipype.interfaces.base.traits_extension.File: File, |
| 217 | + nipype.interfaces.base.traits_extension.Directory: Directory, |
| 218 | +} |
0 commit comments