diff --git a/docs/source/tutorial/2-advanced-execution.ipynb b/docs/source/tutorial/2-advanced-execution.ipynb index 164ab87fd..e09ea94e7 100644 --- a/docs/source/tutorial/2-advanced-execution.ipynb +++ b/docs/source/tutorial/2-advanced-execution.ipynb @@ -345,11 +345,15 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Of course for this to work Docker needs to work and be configured for\n", - "[sudo-less execution](https://docs.docker.com/engine/install/linux-postinstall/).\n", + "Note that for this to work Docker needs to work and be configured for\n", + "[sudo-less execution](https://docs.docker.com/engine/install/linux-postinstall/). If\n", + "the command you want to execute is the entrypoint of the container, you can set the\n", + "`executable` field to None, e.g.\n", + "`MrGrid(executable=None, in_file=nifti_file, operation=\"regrid\", voxel=(0.5, 0.5, 0.5))(environment=docker.Environment(\"mrgrid-image-with-entrypoint\"))`.\n", "See [Containers and Environments](../explanation/environments.rst) for more details on\n", "how to utilise containers and add support for other software environments.\n", "\n", + "\n", "It is also possible to specify functions to run at hooks that are immediately before and after\n", "the task is executed by passing a `pydra.engine.hooks.TaskHooks` object to the `hooks`\n", "keyword arg. The callable should take the `pydra.engine.job.Job` object as its only\n", diff --git a/pydra/compose/shell/builder.py b/pydra/compose/shell/builder.py index 82030403e..def514ef5 100644 --- a/pydra/compose/shell/builder.py +++ b/pydra/compose/shell/builder.py @@ -33,6 +33,18 @@ from .task import Task, Outputs +def executable_validator(_, __, value): + """Validator for the executable attribute of a task""" + if value is None: + return + if not isinstance(value, (str, list)): + raise TypeError( + f"executable must be a string or a list of strings, not {value!r}" + ) + if len(value) == 0: + raise ValueError("executable must be a non-empty string or a list of strings") + + @dataclass_transform( kw_only_default=True, field_specifiers=(field.out, field.outarg), @@ -130,13 +142,17 @@ def make( f"Shell task class {wrapped} must have an `executable` " "attribute that specifies the command to run" ) from None - if not isinstance(executable, str) and not ( - isinstance(executable, ty.Sequence) - and all(isinstance(e, str) for e in executable) + if ( + executable is not None + and not isinstance(executable, str) + and not ( + isinstance(executable, ty.Sequence) + and all(isinstance(e, str) for e in executable) + ) ): raise ValueError( - "executable must be a string or a sequence of strings" - f", not {executable!r}" + "executable must be a string or a sequence of strings or None if " + f"the command run is the entrypoint of a container, not {executable!r}" ) class_name = klass.__name__ check_explicit_fields_are_none(klass, inputs, outputs) @@ -199,11 +215,11 @@ def make( ) parsed_inputs["executable"] = field.arg( name="executable", - type=str | ty.Sequence[str], + type=str | ty.Sequence[str] | None, argstr="", position=0, default=executable, - validator=attrs.validators.min_len(1), + validator=executable_validator, help=Task.EXECUTABLE_HELP, ) diff --git a/pydra/compose/shell/task.py b/pydra/compose/shell/task.py index 3bdce0788..60433f6c4 100644 --- a/pydra/compose/shell/task.py +++ b/pydra/compose/shell/task.py @@ -254,6 +254,12 @@ class ShellTask(base.Task[ShellOutputsType]): def _run(self, job: "Job[ShellTask]", rerun: bool = True) -> None: """Run the shell command.""" + + if self.executable is None and not job.environment.has_entrypoint: + raise ValueError( + "executable is not set, and the environment is not a container " + f"({job.environment}) with an entrypoint" + ) job.return_values = job.environment.execute(job) @property @@ -288,12 +294,12 @@ def _command_args(self, values: dict[str, ty.Any]) -> list[str]: if is_fileset_or_union(fld.type) and type(fld_value) is bool: del values[fld.name] # Drop special fields that are added separately - del values["executable"] del values["append_args"] # Add executable - pos_args = [ - self._command_shelltask_executable(fld, self.executable), - ] # list for (position, command arg) + pos_args = [] + if self.executable is not None: + pos_args.append(self._executable_pos_arg(fld, self.executable)) + del values["executable"] positions_provided = [0] fields = {f.name: f for f in get_fields(self)} for field_name in values: @@ -312,9 +318,9 @@ def _command_args(self, values: dict[str, ty.Any]) -> list[str]: command_args += self.append_args return command_args - def _command_shelltask_executable( - self, fld: field.arg, value: ty.Any - ) -> tuple[int, ty.Any]: + def _executable_pos_arg( + self, fld: field.arg, value: str | list[str] | None + ) -> tuple[int, str | list[str] | None]: """Returning position and value for executable Task input""" pos = 0 # executable should be the first el. of the command assert value diff --git a/pydra/compose/shell/tests/test_shell_fields.py b/pydra/compose/shell/tests/test_shell_fields.py index 96189c602..15b33fa3c 100644 --- a/pydra/compose/shell/tests/test_shell_fields.py +++ b/pydra/compose/shell/tests/test_shell_fields.py @@ -6,7 +6,7 @@ import cloudpickle as cp from pydra.compose import shell from pydra.utils.general import get_fields, task_help, wrap_text -from pydra.compose.shell.builder import _InputPassThrough +from pydra.compose.shell.builder import _InputPassThrough, executable_validator from fileformats.generic import File, Directory, FsObject from fileformats import text, image from pydra.utils.typing import MultiInputObj @@ -26,9 +26,9 @@ def test_interface_template(): assert sorted_fields(Cp) == [ shell.arg( name="executable", - validator=attrs.validators.min_len(1), + validator=executable_validator, default="cp", - type=str | ty.Sequence[str], + type=str | ty.Sequence[str] | None, position=0, help=shell.Task.EXECUTABLE_HELP, ), @@ -81,9 +81,9 @@ def test_interface_template_w_types_and_path_template_ext(): assert sorted_fields(TrimPng) == [ shell.arg( name="executable", - validator=attrs.validators.min_len(1), + validator=executable_validator, default="trim-png", - type=str | ty.Sequence[str], + type=str | ty.Sequence[str] | None, position=0, help=shell.Task.EXECUTABLE_HELP, ), @@ -122,9 +122,9 @@ def test_interface_template_w_modify(): assert sorted_fields(TrimPng) == [ shell.arg( name="executable", - validator=attrs.validators.min_len(1), + validator=executable_validator, default="trim-png", - type=str | ty.Sequence[str], + type=str | ty.Sequence[str] | None, position=0, help=shell.Task.EXECUTABLE_HELP, ), @@ -181,9 +181,9 @@ def test_interface_template_more_complex(): assert sorted_fields(Cp) == [ shell.arg( name="executable", - validator=attrs.validators.min_len(1), + validator=executable_validator, default="cp", - type=str | ty.Sequence[str], + type=str | ty.Sequence[str] | None, position=0, help=shell.Task.EXECUTABLE_HELP, ), @@ -281,9 +281,9 @@ def test_interface_template_with_overrides_and_optionals(): assert sorted_fields(Cp) == [ shell.arg( name="executable", - validator=attrs.validators.min_len(1), + validator=executable_validator, default="cp", - type=str | ty.Sequence[str], + type=str | ty.Sequence[str] | None, position=0, help=shell.Task.EXECUTABLE_HELP, ), @@ -353,9 +353,9 @@ def test_interface_template_with_defaults(): assert sorted_fields(Cp) == [ shell.arg( name="executable", - validator=attrs.validators.min_len(1), + validator=executable_validator, default="cp", - type=str | ty.Sequence[str], + type=str | ty.Sequence[str] | None, position=0, help=shell.Task.EXECUTABLE_HELP, ), @@ -421,9 +421,9 @@ def test_interface_template_with_type_overrides(): assert sorted_fields(Cp) == [ shell.arg( name="executable", - validator=attrs.validators.min_len(1), + validator=executable_validator, default="cp", - type=str | ty.Sequence[str], + type=str | ty.Sequence[str] | None, position=0, help=shell.Task.EXECUTABLE_HELP, ), @@ -738,9 +738,9 @@ class Outputs(shell.Outputs): assert sorted_fields(A) == [ shell.arg( name="executable", - validator=attrs.validators.min_len(1), + validator=executable_validator, default="cp", - type=str | ty.Sequence[str], + type=str | ty.Sequence[str] | None, argstr="", position=0, help=shell.Task.EXECUTABLE_HELP, @@ -1004,7 +1004,7 @@ def test_shell_help1(): "----------------------------", "", "Inputs:", - "- executable: str | Sequence[str]; default = 'shelly'", + "- executable: str | Sequence[str] | None; default = 'shelly'", " the first part of the command, can be a string, e.g. 'ls', or a list, e.g.", " ['ls', '-l', 'dirname']", "- in_file: generic/file", diff --git a/pydra/environments/base.py b/pydra/environments/base.py index 96914224e..aae77ff33 100644 --- a/pydra/environments/base.py +++ b/pydra/environments/base.py @@ -26,6 +26,8 @@ class Environment: TODO: add setup and teardown methods """ + has_entrypoint = False + def setup(self): pass @@ -107,6 +109,8 @@ class Container(Environment): Extra arguments to be passed to the container """ + has_entrypoint = True + image: str tag: str = "latest" root: str = "/mnt/pydra" @@ -200,52 +204,46 @@ def map_path(fileset: os.PathLike | FileSet) -> Path: return bindings, values -def execute(cmd, strip=False): - """ - Run the event loop with coroutine. - - Uses :func:`read_and_display_async` unless a loop is - already running, in which case :func:`read_and_display` - is used. +def read_and_display( + *cmd: str, strip: bool = False, hide_display: bool = False +) -> dict[str, int | str]: + """Capture a process' standard output. Parameters ---------- - cmd : :obj:`list` or :obj:`tuple` - The command line to be executed. - strip : :obj:`bool` - TODO - - """ - rc, stdout, stderr = read_and_display(*cmd, strip=strip) + cmd : str + The command to execute, as a list of strings. + strip : bool, optional + If True, the output will be stripped of leading and trailing whitespace. + hide_display : bool, optional + If True, the output will not be displayed. + + Returns + ------- + dict[str, Any] + A dictionary containing the return code, standard output, and standard error. + + Raises + ------ + RuntimeError + If the return code is not 0, a RuntimeError is raised with a formatted + error message. """ - loop = get_open_loop() - if loop.is_running(): - rc, stdout, stderr = read_and_display(*cmd, strip=strip) - else: - rc, stdout, stderr = loop.run_until_complete( - read_and_display_async(*cmd, strip=strip) - ) - """ - return rc, stdout, stderr - - -def read_and_display(*cmd, strip=False, hide_display=False): - """Capture a process' standard output.""" try: process = sp.run(cmd, stdout=sp.PIPE, stderr=sp.PIPE) except Exception: # TODO editing some tracing? raise + stdout = process.stdout.decode("utf-8") if strip: - return ( - process.returncode, - process.stdout.decode("utf-8").strip(), - process.stderr.decode("utf-8"), - ) - else: - return ( - process.returncode, - process.stdout.decode("utf-8"), - process.stderr.decode("utf-8"), - ) + stdout = stdout.strip() + stderr = process.stderr.decode("utf-8") + if process.returncode: + msg = f"Error executing command {' '.join(cmd)!r} (code: {process.returncode}):" + if stdout: + msg += "\n\nstderr:\n" + stderr + if stdout: + msg += "\n\nstdout:\n" + stdout + raise RuntimeError(msg) + return {"return_code": process.returncode, "stdout": stdout, "stderr": stderr} diff --git a/pydra/environments/docker.py b/pydra/environments/docker.py index 03179f0da..bee66e25f 100644 --- a/pydra/environments/docker.py +++ b/pydra/environments/docker.py @@ -28,19 +28,11 @@ def execute(self, job: "Job[shell.Task]") -> dict[str, ty.Any]: ).split() ) docker_args.extend(["-w", f"{self.root}{job.cache_dir}"]) - keys = ["return_code", "stdout", "stderr"] job.cache_dir.mkdir(exist_ok=True) - values = base.execute( - docker_args + [docker_img] + job.task._command_args(values=values), + return base.read_and_display( + *(docker_args + [docker_img] + job.task._command_args(values=values)), ) - output = dict(zip(keys, values)) - if output["return_code"]: - if output["stderr"]: - raise RuntimeError(output["stderr"]) - else: - raise RuntimeError(output["stdout"]) - return output # Alias so it can be referred to as docker.Environment diff --git a/pydra/environments/native.py b/pydra/environments/native.py index fa51108d0..04e02aa8a 100644 --- a/pydra/environments/native.py +++ b/pydra/environments/native.py @@ -15,18 +15,8 @@ class Native(base.Environment): """ def execute(self, job: "Job[shell.Task]") -> dict[str, ty.Any]: - keys = ["return_code", "stdout", "stderr"] cmd_args = job.task._command_args(values=job.inputs) - values = base.execute(cmd_args) - output = dict(zip(keys, values)) - if output["return_code"]: - msg = f"Error running '{job.name}' job with {cmd_args}:" - if output["stderr"]: - msg += "\n\nstderr:\n" + output["stderr"] - if output["stdout"]: - msg += "\n\nstdout:\n" + output["stdout"] - raise RuntimeError(msg) - return output + return base.read_and_display(*cmd_args) # Alias so it can be referred to as native.Environment diff --git a/pydra/environments/singularity.py b/pydra/environments/singularity.py index a7fc0efff..9e1157459 100644 --- a/pydra/environments/singularity.py +++ b/pydra/environments/singularity.py @@ -31,21 +31,14 @@ def execute(self, job: "Job[shell.Task]") -> dict[str, ty.Any]: singularity_args.extend( ["--pwd", f"{self.root.rstrip('/')}{job.cache_dir.absolute()}"] ) - keys = ["return_code", "stdout", "stderr"] - job.cache_dir.mkdir(exist_ok=True) - values = base.execute( - singularity_args - + [singularity_img] - + job.task._command_args(values=values), + return base.read_and_display( + *( + singularity_args + + [singularity_img] + + job.task._command_args(values=values) + ) ) - output = dict(zip(keys, values)) - if output["return_code"]: - if output["stderr"]: - raise RuntimeError(output["stderr"]) - else: - raise RuntimeError(output["stdout"]) - return output # Alias so it can be referred to as singularity.Environment diff --git a/pydra/environments/tests/test_environments.py b/pydra/environments/tests/test_environments.py index b39ed8ee8..2573faae3 100644 --- a/pydra/environments/tests/test_environments.py +++ b/pydra/environments/tests/test_environments.py @@ -503,3 +503,50 @@ def newcache(x): "file_1_copy.txt", "file_2_copy.txt", ] + + +@no_win +@need_docker +def test_entrypoint(tmp_path): + """docker env: task with a file in the output""" + + import docker as docker_engine + + dc = docker_engine.from_env() + + # Create executable that runs validator then produces some mock output + # files + build_dir = tmp_path / "build" + build_dir.mkdir() + entrypoint = build_dir / "entrypoint.sh" + with open(entrypoint, "w") as f: + f.write("#!/bin/sh\necho hello $1") + + IMAGE_TAG = "pydra-test-entrypoint" + + # Build mock BIDS app image + with open(build_dir / "Dockerfile", "w") as f: + f.write( + """FROM busybox +ADD ./entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh +ENTRYPOINT ["/entrypoint.sh"]""" + ) + + dc.images.build(path=str(build_dir), tag=IMAGE_TAG + ":latest") + + @shell.define + class TestEntrypoint(shell.Task): + """task with a file in the output""" + + executable = None + persons_name: str = shell.arg(help="the name of the person to say hello to") + + class Outputs(shell.Outputs): + pass + + test_entrypoint = TestEntrypoint(persons_name="Guido") + + outputs = test_entrypoint(environment=docker.Environment(image=IMAGE_TAG)) + + assert outputs.stdout == "hello Guido\n"