diff --git a/README.md b/README.md index 31f2a0e..b8c392c 100644 --- a/README.md +++ b/README.md @@ -22,13 +22,13 @@ This library requires: ## Versioning -This package's version follows [Semantic Versioning 2.0](https://semver.org/), but is still considered to be in its +This package's version follows [Semantic Versioning 2.0](https://semver.org/), but is still considered to be in its initial development, thus backwards incompatible versions are denoted by minor version bumps. To help illustrate how versions will increment during this initial development stage, they are described below: -1. The MAJOR version is currently 0, indicating initial development. -2. The MINOR version is currently incremented when backwards incompatible changes are introduced to the public API. -3. The PATCH version is currently incremented when bug fixes or backwards compatible changes are introduced to the public API. +1. The MAJOR version is currently 0, indicating initial development. +2. The MINOR version is currently incremented when backwards incompatible changes are introduced to the public API. +3. The PATCH version is currently incremented when bug fixes or backwards compatible changes are introduced to the public API. ## Contributing @@ -64,7 +64,7 @@ Template at '/path/to/job.template.json' passes validation checks! ### `summary` -Displays summary information about a sample Job or Step, and the Steps and Tasks therein. The user may provide parameters to +Displays summary information about a sample Job or Step, and the Steps and Tasks therein. The user may provide parameters to customize the Job, as parameters can have an impact on the amount of Steps and Tasks that a job consists of. #### Arguments @@ -145,7 +145,7 @@ Session ended successfully Job: MyJob Step: Step1 Duration: 1.0 seconds -Tasks run: 1 +Chunks run: 1 ``` @@ -181,11 +181,11 @@ See [VERIFYING_PGP_SIGNATURE](VERIFYING_PGP_SIGNATURE.md) for more information. ## Security -We take all security reports seriously. When we receive such reports, we will -investigate and subsequently address any potential vulnerabilities as quickly -as possible. If you discover a potential security issue in this project, please +We take all security reports seriously. When we receive such reports, we will +investigate and subsequently address any potential vulnerabilities as quickly +as possible. If you discover a potential security issue in this project, please notify AWS/Amazon Security via our [vulnerability reporting page](http://aws.amazon.com/security/vulnerability-reporting/) -or directly via email to [AWS Security](aws-security@amazon.com). Please do not +or directly via email to [AWS Security](aws-security@amazon.com). Please do not create a public GitHub issue in this project. ## License diff --git a/pyproject.toml b/pyproject.toml index 923e747..bcc5d81 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,8 +29,8 @@ classifiers = [ "Intended Audience :: End Users/Desktop" ] dependencies = [ - "openjd-sessions == 0.10.*", - "openjd-model == 0.6.*" + "openjd-sessions >= 0.10.1,< 0.11", + "openjd-model == 0.7.*" ] [project.urls] diff --git a/src/openjd/__main__.py b/src/openjd/__main__.py index c324d07..f777da6 100644 --- a/src/openjd/__main__.py +++ b/src/openjd/__main__.py @@ -1,26 +1,6 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -import sys -import traceback +from .cli._create_argparser import main -from .cli._create_argparser import create_argparser -__all__ = ("main",) - - -def main() -> None: - parser = create_argparser() - - args = parser.parse_args(sys.argv[1:]) - try: - # Raises: - # SystemExit - on failure - args.func(args) - except Exception as exc: - print(f"ERROR: {str(exc)}", file=sys.stderr) - traceback.print_exc() - sys.exit(1) - - -if __name__ == "__main__": - main() +main() diff --git a/src/openjd/cli/__init__.py b/src/openjd/cli/__init__.py index 8d929cc..e246a21 100644 --- a/src/openjd/cli/__init__.py +++ b/src/openjd/cli/__init__.py @@ -1 +1,5 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +from ._create_argparser import main + +__all__ = ["main"] diff --git a/src/openjd/cli/_check/__init__.py b/src/openjd/cli/_check/__init__.py index ab2f41e..8836062 100644 --- a/src/openjd/cli/_check/__init__.py +++ b/src/openjd/cli/_check/__init__.py @@ -1,7 +1,7 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. from .._common import add_common_arguments, CommonArgument, SubparserGroup -from ._check_command import do_check +from ._check_command import add_check_arguments, do_check def populate_argparser(subcommands: SubparserGroup) -> None: @@ -12,7 +12,7 @@ def populate_argparser(subcommands: SubparserGroup) -> None: description="Given an Open Job Description template file, parse the file and run validation checks against it to ensure that it is correctly formed.", ) - # `check` has no unique arguments; # add all arguments through `add_common_arguments` add_common_arguments(check_parser, {CommonArgument.PATH}) + add_check_arguments(check_parser) check_parser.set_defaults(func=do_check) diff --git a/src/openjd/cli/_check/_check_command.py b/src/openjd/cli/_check/_check_command.py index 142a2c5..b3297ea 100644 --- a/src/openjd/cli/_check/_check_command.py +++ b/src/openjd/cli/_check/_check_command.py @@ -1,6 +1,6 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -from argparse import Namespace +from argparse import ArgumentParser, Namespace from openjd.model import ( DecodeValidationError, TemplateSpecificationVersion, @@ -8,13 +8,25 @@ decode_environment_template, ) -from .._common import read_template, OpenJDCliResult, print_cli_result +from .._common import ( + add_extensions_argument, + read_template, + OpenJDCliResult, + print_cli_result, + process_extensions_argument, +) + + +def add_check_arguments(run_parser: ArgumentParser): + add_extensions_argument(run_parser) @print_cli_result def do_check(args: Namespace) -> OpenJDCliResult: """Open a provided template file and check its schema for errors.""" + extensions = process_extensions_argument(args.extensions) + try: # Raises: RuntimeError template_object = read_template(args.path) @@ -27,7 +39,7 @@ def do_check(args: Namespace) -> OpenJDCliResult: # Raises: DecodeValidationError if TemplateSpecificationVersion.is_job_template(template_version): - decode_job_template(template=template_object, supported_extensions=["TASK_CHUNKING"]) + decode_job_template(template=template_object, supported_extensions=extensions) elif TemplateSpecificationVersion.is_environment_template(template_version): decode_environment_template(template=template_object) else: diff --git a/src/openjd/cli/_common/__init__.py b/src/openjd/cli/_common/__init__.py index caa6cb8..ee9d9fd 100644 --- a/src/openjd/cli/_common/__init__.py +++ b/src/openjd/cli/_common/__init__.py @@ -9,6 +9,7 @@ import yaml import os +from ._extensions import add_extensions_argument, process_extensions_argument from ._job_from_template import ( job_from_template, get_job_params, @@ -23,9 +24,11 @@ from openjd.model import DecodeValidationError, Job __all__ = [ + "add_extensions_argument", "get_doc_type", "get_job_params", "get_params_from_file", + "process_extensions_argument", "read_template", "read_job_template", "read_environment_template", @@ -109,10 +112,10 @@ def add(self, name: str, description: str, **kwargs) -> ArgumentParser: return self.group.add_parser(name, **kwargs) -def generate_job(args: Namespace) -> Job: +def generate_job(args: Namespace, *, supported_extensions: list[str]) -> Job: try: # Raises: RuntimeError, DecodeValidationError - template = read_job_template(args.path) + template = read_job_template(args.path, supported_extensions=supported_extensions) # Raises: RuntimeError return job_from_template( template, diff --git a/src/openjd/cli/_common/_extensions.py b/src/openjd/cli/_common/_extensions.py new file mode 100644 index 0000000..bd1a8f8 --- /dev/null +++ b/src/openjd/cli/_common/_extensions.py @@ -0,0 +1,34 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +from argparse import ArgumentParser +from typing import Optional + +# This is the list of Open Job Description extensions with implemented support +SUPPORTED_EXTENSIONS = ["TASK_CHUNKING"] + + +def add_extensions_argument(run_parser: ArgumentParser): + run_parser.add_argument( + "--extensions", + help=f"A comma-separated list of Open Job Description extension names to enable. Defaults to all that are implemented: {','.join(SUPPORTED_EXTENSIONS)}", + ) + + +def process_extensions_argument(extensions: Optional[str]) -> list[str]: + """Process the comma-separated extensions argument and return a list of supported extensions.""" + + # If the option is not provided, default to all the supported extensions. + if extensions is None: + return SUPPORTED_EXTENSIONS + + extensions_list = [ + extension.strip().upper() for extension in extensions.split(",") if extension.strip() != "" + ] + + unsupported_extensions = set(extensions_list) - set(SUPPORTED_EXTENSIONS) + if unsupported_extensions: + raise ValueError( + f"Unsupported Open Job Description extension(s): {', '.join(sorted(unsupported_extensions))}" + ) + + return extensions_list diff --git a/src/openjd/cli/_common/_validation_utils.py b/src/openjd/cli/_common/_validation_utils.py index a2628fd..9b051ff 100644 --- a/src/openjd/cli/_common/_validation_utils.py +++ b/src/openjd/cli/_common/_validation_utils.py @@ -53,7 +53,7 @@ def read_template(template_file: Path) -> dict[str, Any]: return template_object -def read_job_template(template_file: Path) -> JobTemplate: +def read_job_template(template_file: Path, *, supported_extensions: list[str]) -> JobTemplate: """Open a JSON or YAML-formatted file and attempt to parse it into a JobTemplate object. Raises a RuntimeError if the file doesn't exist or can't be opened, and raises a DecodeValidationError if its contents can't be parsed into a valid JobTemplate. @@ -62,7 +62,9 @@ def read_job_template(template_file: Path) -> JobTemplate: template_object = read_template(template_file) # Raises: DecodeValidationError - template = decode_job_template(template=template_object, supported_extensions=["TASK_CHUNKING"]) + template = decode_job_template( + template=template_object, supported_extensions=supported_extensions + ) return template diff --git a/src/openjd/cli/_create_argparser.py b/src/openjd/cli/_create_argparser.py index 25a950d..bc18e5a 100644 --- a/src/openjd/cli/_create_argparser.py +++ b/src/openjd/cli/_create_argparser.py @@ -1,6 +1,9 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. from argparse import ArgumentParser +import sys +import traceback +from typing import Optional from ._common import SubparserGroup @@ -28,3 +31,21 @@ def create_argparser() -> ArgumentParser: populate_run_subparser(subcommands) populate_schema_subparser(subcommands) return parser + + +def main(arg_list: Optional[list[str]] = None) -> None: + """Main function for invoking the CLI""" + parser = create_argparser() + + if arg_list is None: + arg_list = sys.argv[1:] + + args = parser.parse_args(arg_list) + try: + # Raises: + # SystemExit - on failure + args.func(args) + except Exception as exc: + print(f"ERROR: {str(exc)}", file=sys.stderr) + traceback.print_exc() + sys.exit(1) diff --git a/src/openjd/cli/_run/__init__.py b/src/openjd/cli/_run/__init__.py index bd3ef95..6d1af40 100644 --- a/src/openjd/cli/_run/__init__.py +++ b/src/openjd/cli/_run/__init__.py @@ -8,7 +8,7 @@ def populate_argparser(subcommands: SubparserGroup) -> None: """Adds the `run` command and all of its arguments to the given parser.""" run_parser = subcommands.add( "run", - description="Takes a Job Template and Step name, then runs Tasks from that Step.", + description="Takes a Job Template and runs the entire job or a selected Step from the job.", usage="openjd run JOB_TEMPLATE_PATH [arguments]", ) add_common_arguments(run_parser, {CommonArgument.PATH, CommonArgument.JOB_PARAMS}) diff --git a/src/openjd/cli/_run/_local_session/_actions.py b/src/openjd/cli/_run/_local_session/_actions.py index 74b477c..d8b757c 100644 --- a/src/openjd/cli/_run/_local_session/_actions.py +++ b/src/openjd/cli/_run/_local_session/_actions.py @@ -1,10 +1,27 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +from enum import Enum + from openjd.model import Step, TaskParameterSet from openjd.model.v2023_09 import Environment from openjd.sessions import Session +class EnvironmentType(str, Enum): + """ + The three different types of environment types that can be entered/exited in a session. + """ + + EXTERNAL = "EXTERNAL" + JOB = "JOB" + STEP = "STEP" + ALL = "ALL" + + def matches(self, other: "EnvironmentType") -> bool: + """Environment types match if they are equal, or one of them is ALL.""" + return self == other or self == EnvironmentType.ALL or other == EnvironmentType.ALL + + class SessionAction: _session: Session duration: float @@ -56,13 +73,17 @@ def __str__(self): class ExitEnvironmentAction(SessionAction): _id: str + _keep_session_running: bool - def __init__(self, session: Session, id: str): + def __init__(self, session: Session, id: str, keep_session_running: bool): super(ExitEnvironmentAction, self).__init__(session) self._id = id + self._keep_session_running = keep_session_running def run(self): - self._session.exit_environment(identifier=self._id) + self._session.exit_environment( + identifier=self._id, keep_session_running=self._keep_session_running + ) def __str__(self): return f"Exit Environment '{self._id}'" diff --git a/src/openjd/cli/_run/_local_session/_logs.py b/src/openjd/cli/_run/_local_session/_logs.py index 18ea4c0..194fcfa 100644 --- a/src/openjd/cli/_run/_local_session/_logs.py +++ b/src/openjd/cli/_run/_local_session/_logs.py @@ -1,8 +1,19 @@ # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -import time from dataclasses import dataclass from logging import Handler, LogRecord +from datetime import datetime, timezone +from enum import Enum + + +class LoggingTimestampFormat(str, Enum): + """ + Different formats for the timestamp of each log entry + """ + + RELATIVE = "relative" + LOCAL = "local" + UTC = "utc" @dataclass @@ -23,18 +34,41 @@ class LocalSessionLogHandler(Handler): A custom Handler that formats and records logs in a dataclass. Used to print logs to `stdout` in real time while also storing them in memory. + + It prints a timestamp that is relative to the session start. """ messages: list[LogEntry] = [] _should_print: bool + _session_start_timestamp: datetime + _timestamp_format: LoggingTimestampFormat - def __init__(self, should_print: bool): + def __init__( + self, + should_print: bool, + session_start_timestamp: datetime, + timestamp_format: LoggingTimestampFormat, + ): super(LocalSessionLogHandler, self).__init__() self._should_print = should_print + self._session_start_timestamp = session_start_timestamp + self._timestamp_format = timestamp_format def handle(self, record: LogRecord) -> bool: + if self._timestamp_format == LoggingTimestampFormat.RELATIVE: + timestamp = str( + datetime.fromtimestamp(record.created, timezone.utc) - self._session_start_timestamp + ) + elif self._timestamp_format == LoggingTimestampFormat.LOCAL: + timestamp = str( + datetime.fromtimestamp(record.created, timezone.utc).astimezone().isoformat() + ) + else: + timestamp = str(datetime.fromtimestamp(record.created, timezone.utc).isoformat()) + + record.created = datetime.fromtimestamp(record.created, timezone.utc).timestamp() new_record = LogEntry( - timestamp=time.asctime(time.localtime(record.created)), + timestamp=timestamp, message=record.getMessage(), ) self.messages.append(new_record) diff --git a/src/openjd/cli/_run/_local_session/_session_manager.py b/src/openjd/cli/_run/_local_session/_session_manager.py index a4ef7ce..feb74a8 100644 --- a/src/openjd/cli/_run/_local_session/_session_manager.py +++ b/src/openjd/cli/_run/_local_session/_session_manager.py @@ -3,20 +3,27 @@ from queue import Queue from threading import Event import time -from typing import Optional, Type +from typing import Any, Iterable, Optional, Type from types import FrameType, TracebackType from signal import signal, SIGINT, SIGTERM, SIG_DFL - -from ._actions import EnterEnvironmentAction, ExitEnvironmentAction, RunTaskAction, SessionAction -from ._logs import LocalSessionLogHandler, LogEntry +from itertools import islice +from datetime import datetime, timedelta, timezone + +from ._actions import ( + EnterEnvironmentAction, + ExitEnvironmentAction, + RunTaskAction, + SessionAction, + EnvironmentType, +) +from ._logs import LocalSessionLogHandler, LogEntry, LoggingTimestampFormat from openjd.model import ( - EnvironmentTemplate, + IntRangeExpr, Job, JobParameterValues, ParameterValue, ParameterValueType, Step, - StepParameterSpace, StepParameterSpaceIterator, TaskParameterSet, ) @@ -30,28 +37,40 @@ ) +class LocalSessionFailed(RuntimeError): + """ + Raised when an action in the session fails. + """ + + def __init__(self, failed_action: SessionAction): + self.failed_action = failed_action + super().__init__(f"Action failed: {failed_action}") + + class LocalSession: """ - A wrapper for the `Session` object in the `sessions` module - that holds information about a locally-running Session launched - from the CLI. + A class to manage a `Session` object from the `sessions` module, + to run tasks of a job in a locally-running Session launched from the CLI. + + An OpenJD session's purpose is to run tasks from a single job. It can run + tasks from different steps, as long as it enters the step environments + before and exits them after. """ session_id: str failed: bool = False - ended: Event - tasks_run: int = 0 + task_run_count: int = 0 _job: Job _maximum_tasks: int - _start_seconds: float - _end_seconds: float - _inner_session: Session + _openjd_session: Session _enter_env_queue: Queue[EnterEnvironmentAction] _action_queue: Queue[RunTaskAction] _current_action: Optional[SessionAction] + _failed_action: Optional[SessionAction] _action_ended: Event _path_mapping_rules: Optional[list[PathMappingRule]] - _environments: Optional[list[EnvironmentTemplate]] + _environments: Optional[list[Any]] + _environments_entered: list[tuple[EnvironmentType, str]] _log_handler: LocalSessionLogHandler _cleanup_called: bool @@ -60,19 +79,20 @@ def __init__( *, job: Job, session_id: str, + timestamp_format: LoggingTimestampFormat = LoggingTimestampFormat.RELATIVE, path_mapping_rules: Optional[list[PathMappingRule]] = None, - environments: Optional[list[EnvironmentTemplate]] = None, + environments: Optional[list[Any]] = None, should_print_logs: bool = True, retain_working_dir: bool = False, ): self.session_id = session_id - self.ended = Event() self._action_ended = Event() self._job = job + self._timestamp_format = timestamp_format self._path_mapping_rules = path_mapping_rules self._environments = environments - # Create an inner Session + # Create an OpenJD Session job_parameters: JobParameterValues if job.parameters: job_parameters = { @@ -82,7 +102,7 @@ def __init__( else: job_parameters = dict[str, ParameterValue]() - self._inner_session = Session( + self._openjd_session = Session( session_id=self.session_id, job_parameter_values=job_parameters, path_mapping_rules=self._path_mapping_rules, @@ -90,20 +110,71 @@ def __init__( retain_working_dir=retain_working_dir, ) + self._should_print_logs = should_print_logs + self._cleanup_called = False + self._started = False + + self._current_action = None + self._failed_action = None + self._environments_entered = [] + # Initialize the action queue self._enter_env_queue: Queue[EnterEnvironmentAction] = Queue() self._action_queue: Queue[RunTaskAction] = Queue() - self._current_action = None - self._should_print_logs = should_print_logs - self._cleanup_called = False + def _context_manager_cleanup(self): + try: + # Exit all the environments that were entered + self.run_environment_exits(type=EnvironmentType.ALL, keep_session_running=False) + finally: + signal(SIGINT, SIG_DFL) + signal(SIGTERM, SIG_DFL) + self._started = False + + if self.failed: + LOG.info( + msg=f"Open Job Description CLI: ERROR executing action: '{self.failed_action}' (see Task logs for details)", + extra={"session_id": self.session_id}, + ) + else: + LOG.info( + msg="Open Job Description CLI: All actions completed successfully!", + extra={"session_id": self.session_id}, + ) + self.cleanup() + self._started = False def __enter__(self) -> "LocalSession": # Add log handling - self._log_handler = LocalSessionLogHandler(should_print=self._should_print_logs) + session_start_timestamp = datetime.now(timezone.utc) + self._log_handler = LocalSessionLogHandler( + should_print=self._should_print_logs, + session_start_timestamp=session_start_timestamp, + timestamp_format=self._timestamp_format, + ) LOG.addHandler(self._log_handler) + LOG.info( + msg=f"Open Job Description CLI: Session start {session_start_timestamp.astimezone().isoformat()}", + extra={"session_id": self.session_id}, + ) + LOG.info( + msg=f"Open Job Description CLI: Running job '{self._job.name}'", + extra={"session_id": self.session_id}, + ) signal(SIGINT, self._sigint_handler) signal(SIGTERM, self._sigint_handler) + + self._started = True + + # Enter all the external and job environments + try: + self.run_environment_enters(self._environments, EnvironmentType.EXTERNAL) + self.run_environment_enters(self._job.jobEnvironments, EnvironmentType.JOB) + except LocalSessionFailed: + # If __enter__ fails, __exit__ won't be called so need to clean up here + self._context_manager_cleanup() + raise + return self def __exit__( @@ -112,179 +183,206 @@ def __exit__( exc_value: Optional[BaseException], traceback: Optional[TracebackType], ) -> None: - signal(SIGINT, SIG_DFL) - signal(SIGTERM, SIG_DFL) - self.cleanup() + # __enter__ should have been called before __exit__ + if not self._started: + raise RuntimeError("Session was not started via a with statement.") + + self._context_manager_cleanup() def _sigint_handler(self, signum: int, frame: Optional[FrameType]) -> None: """Signal handler that is invoked when the process receives a SIGINT/SIGTERM""" LOG.info("Interruption signal recieved.") self.cancel() - def cleanup(self) -> None: - if not self._cleanup_called: - LOG.info( - msg="Open Job Description CLI: Local Session ended! Now cleaning up Session resources.", - extra={"session_id": self.session_id}, - ) - self._log_handler.close() - LOG.removeHandler(self._log_handler) - - self._inner_session.cleanup() - self._cleanup_called = True + def run_environment_enters(self, environments: Optional[list[Any]], type: EnvironmentType): + """Enter one or more environments in the session.""" + if environments is None: + return - def initialize( - self, - *, - dependencies: list[Step], - step: Step, - maximum_tasks: int = -1, - task_parameter_values: Optional[list[dict]] = None, - ) -> None: - """ - Queues up necessary actions for the Step. - - Args: - job: The Job this Step belongs to. - step: The Step object to run. - task_parameters: A list of Task parameter sets to run this Step with. - If not specified, defaults to the first Task parameter set defined in the Step's - parameter space. - """ - - self.ended.clear() - - session_environment_ids: list[str] = [] - # Enqueue "Enter Environment" actions for the given environments - if self._environments: - envs = [environ.environment for environ in self._environments] - session_environment_ids += self._add_environments(envs) - - # Enqueue "Enter Environment" actions for root level environments - if self._job.jobEnvironments: - session_environment_ids += self._add_environments(self._job.jobEnvironments) - - # Step-level environments can only be defined if there is a single Step, - # or else we can't run the required Steps in a single Session - if not dependencies and step.stepEnvironments: - session_environment_ids += self._add_environments(step.stepEnvironments) - - # Next, per dependency, enqueue "Run Task" actions for each set of Task parameters - # If the Step takes no parameters, we only need to enqueue a single Step with an empty parameter list - for dep in dependencies: - if not dep.parameterSpace: - self._action_queue.put( - RunTaskAction( - session=self._inner_session, - step=dep, - parameters=dict[str, ParameterValue](), - ) - ) - else: - for parameter_set in StepParameterSpaceIterator(space=dep.parameterSpace): - self._action_queue.put( - RunTaskAction( - session=self._inner_session, step=dep, parameters=parameter_set - ) - ) - - # The Step specified by the user is the only one that needs to use custom Task parameters, if given - if not step.parameterSpace: - self._action_queue.put(RunTaskAction(self._inner_session, step=step, parameters=dict())) - - else: - if not task_parameter_values: - parameter_sets: list[TaskParameterSet] = list( - StepParameterSpaceIterator(space=step.parameterSpace) - ) - else: - try: - parameter_sets = [ - self._generate_task_parameter_set( - parameter_space=step.parameterSpace, parameter_values=values - ) - for values in task_parameter_values - ] - - except RuntimeError as rte: - LOG.info( - f"Open Job Description CLI: Skipping Task parameter set with errors:\n{str(rte)}" - ) - - # Set the `failed` flag to indicate that there were problems, - # but continue running the Session in case there are parameter sets that still work - self.failed = True - - # Task maximum is only imposed if the user provides a positive value - if maximum_tasks > 0: - parameter_sets = parameter_sets[: min(maximum_tasks, len(parameter_sets))] - - for param_set in parameter_sets: - self._action_queue.put( - RunTaskAction(self._inner_session, step=step, parameters=param_set) - ) - - def run(self) -> None: - if self._inner_session.state != SessionState.READY: - raise RuntimeError("Session is not in a READY state") - - environments_entered = list[str]() - failed_action: Optional[SessionAction] = None - - self._start_seconds = time.perf_counter() + if self._openjd_session.state != SessionState.READY: + raise RuntimeError( + f"Session must be in READY state, but is in {self._openjd_session.state.name}" + ) - # Enter all of the Environments, and keep track of which ones we've entered - while not self._enter_env_queue.empty() and not self.failed: + for env in environments: + env_id = f"{type.name} - {env.name}" self._action_ended.clear() - action = self._enter_env_queue.get() - environments_entered.append(action._id) - self._current_action = action + self._current_action = EnterEnvironmentAction( + session=self._openjd_session, environment=env, env_id=env_id + ) + self._environments_entered.append((type, env_id)) self._current_action.run() self._action_ended.wait() if self.failed: - failed_action = self._current_action + self._failed_action = self._current_action + self._current_action = None + raise LocalSessionFailed(self._failed_action) + self._current_action = None - # Run all of the Tasks that are enqueued - while not self._action_queue.empty() and not self.failed: - self._action_ended.clear() - self._current_action = self._action_queue.get() - self._current_action.run() - self._action_ended.wait() - if self.failed: - failed_action = self._current_action + def run_environment_exits(self, type: EnvironmentType, *, keep_session_running: bool): + """Exit environments that were entered in this session, in reverse order. + Only exits environments matching the provided environment type. + """ + if self._openjd_session.state not in (SessionState.READY, SessionState.READY_ENDING): + raise RuntimeError( + f"Session must be in READY or READY_ENDING state, but is in {self._openjd_session.state.name}" + ) + + failed_action = None - # Exit all environments that were entered. We always try to exit all failed environments; even - # if an environment-enter, task, or environment exit failed. - while environments_entered: + while self._environments_entered and self._environments_entered[-1][0].matches(type): + env_id = self._environments_entered.pop()[1] prev_action_failed = self.failed self._action_ended.clear() self._current_action = ExitEnvironmentAction( - self._inner_session, environments_entered.pop() + session=self._openjd_session, id=env_id, keep_session_running=keep_session_running ) self._current_action.run() self._action_ended.wait() if self.failed and not prev_action_failed: - failed_action = self._current_action + failed_action = self._failed_action = self._current_action + self._current_action = None + + if failed_action: + raise LocalSessionFailed(failed_action) + + def run_task(self, step: Step, parameter_set: TaskParameterSet) -> None: + """Run a single task of a step in the session.""" + if self._openjd_session.state != SessionState.READY: + raise RuntimeError( + f"Session must be in READY state, but is in {self._openjd_session.state.name}" + ) + self._action_ended.clear() + self._current_action = RunTaskAction( + session=self._openjd_session, step=step, parameters=parameter_set + ) + self._current_action.run() + self._action_ended.wait() if self.failed: - # Action encountered an error; clean up resources and end session - LOG.info( - msg=f"Open Job Description CLI: ERROR executing action: '{str(failed_action)}' (see Task logs for details)", - extra={"session_id": self.session_id}, + self._failed_action = self._current_action + self._current_action = None + raise LocalSessionFailed(self._failed_action) + self._current_action = None + + def _run_tasks_adaptive_chunking( + self, step: Step, task_parameters: StepParameterSpaceIterator, maximum_tasks: Optional[int] + ): + """Runs all the tasks of the task_parameters iterator with adaptive chunking.""" + completed_task_count = 0 + completed_task_duration = 0.0 + target_runtime_seconds = int( + step.parameterSpace.taskParameterDefinitions[ # type: ignore + task_parameters.chunks_parameter_name # type: ignore + ].chunks.targetRuntimeSeconds + ) # type: ignore + + while True: + # Get the next chunk to run + parameter_set = next(task_parameters, None) + if parameter_set is None: + break + + start_seconds = time.perf_counter() + # This may raise a LocalSessionFailed exception + self.run_task(step, parameter_set) + duration = time.perf_counter() - start_seconds + + # Accumulate the task count and duration from running this chunk + completed_task_count += len( + IntRangeExpr.from_str(parameter_set[task_parameters.chunks_parameter_name].value) # type: ignore ) + completed_task_duration += duration + + # Estimate a chunk size based on the statistics, and update the iterator. Note that this + # logic is very simple, providing a good starting point that behaves reasonably for other implementations + # to follow. + duration_per_task = completed_task_duration / completed_task_count + adaptive_chunk_size = target_runtime_seconds / duration_per_task + if ( + completed_task_count < 10 + and adaptive_chunk_size > task_parameters.chunks_default_task_count # type: ignore + ): + # When we have data about only a few tasks, gradually blend in the new estimate instead of cutting over immediately + adaptive_chunk_size = ( + 0.75 * task_parameters.chunks_default_task_count + 0.25 * adaptive_chunk_size # type: ignore + ) + adaptive_chunk_size = max(int(adaptive_chunk_size), 1) + if adaptive_chunk_size != task_parameters.chunks_default_task_count: + LOG.info( + msg=f"Open Job Description CLI: Ran {completed_task_count} tasks in {timedelta(seconds=completed_task_duration)}, average {timedelta(seconds=completed_task_duration / completed_task_count)}", + extra={"session_id": self.session_id}, + ) + LOG.info( + msg=f"Open Job Description CLI: Adjusting chunk size from {task_parameters.chunks_default_task_count} to {adaptive_chunk_size}", + extra={"session_id": self.session_id}, + ) + task_parameters.chunks_default_task_count = adaptive_chunk_size - else: - # In this case, we've finished all the Tasks and exited all the environments, - # so we can clean up and end - # else: + # If a maximum task count was specified, count them down + if maximum_tasks and maximum_tasks > 0: + maximum_tasks -= 1 + if maximum_tasks == 0: + break + + def run_step( + self, + step: Step, + task_parameters: Optional[Iterable[TaskParameterSet]] = None, + maximum_tasks: Optional[int] = None, + ) -> None: + """Run a step in the session. Optional parameters control which tasks to run.""" + if self._openjd_session.state != SessionState.READY: + raise RuntimeError( + f"Session must be in READY state, but is in {self._openjd_session.state.name}" + ) + + LOG.info( + msg=f"Open Job Description CLI: Running step '{step.name}'", + extra={"session_id": self.session_id}, + ) + + if task_parameters is None: + task_parameters = StepParameterSpaceIterator(space=step.parameterSpace) + + # Enter all the step environments + self.run_environment_enters(step.stepEnvironments, EnvironmentType.STEP) + + try: + # Run the tasks + if ( + isinstance(task_parameters, StepParameterSpaceIterator) + and task_parameters.chunks_adaptive + ): + self._run_tasks_adaptive_chunking(step, task_parameters, maximum_tasks) + else: + # Run without adaptive chunking + if maximum_tasks and maximum_tasks > 0: + task_parameters = islice(task_parameters, maximum_tasks) + + for parameter_set in task_parameters: + # This may raise a LocalSessionFailed exception + self.run_task(step, parameter_set) + finally: + # Exit all the step environments + self.run_environment_exits(type=EnvironmentType.STEP, keep_session_running=True) + + def cleanup(self) -> None: + if not self._cleanup_called: LOG.info( - msg="Open Job Description CLI: All actions completed successfully!", + msg="Open Job Description CLI: Local Session ended! Now cleaning up Session resources.", extra={"session_id": self.session_id}, ) - self._current_action = None + self._log_handler.close() + LOG.removeHandler(self._log_handler) + + self._openjd_session.cleanup() + self._cleanup_called = True - self._end_seconds = time.perf_counter() - self.ended.set() + @property + def failed_action(self) -> Optional[SessionAction]: + """The action that failed, if any.""" + return self._failed_action def cancel(self): LOG.info( @@ -292,10 +390,10 @@ def cancel(self): extra={"session_id": self.session_id}, ) - if self._inner_session.state == SessionState.RUNNING: + if self._openjd_session.state == SessionState.RUNNING: # The action will call self._action_callback when it has exited, # and that will exit the loop in self.run() - self._inner_session.cancel_action() + self._openjd_session.cancel_action() LOG.info( msg=f"Open Job Description CLI: Session terminated by user while running action: '{str(self._current_action)}'.", @@ -303,86 +401,14 @@ def cancel(self): ) self.failed = True - def get_duration(self) -> float: - if not self._start_seconds: - return 0 - elif not self._end_seconds: - return time.perf_counter() - self._start_seconds - return self._end_seconds - self._start_seconds - def get_log_messages(self) -> list[LogEntry]: return self._log_handler.messages - def _generate_task_parameter_set( - self, *, parameter_space: StepParameterSpace, parameter_values: dict - ) -> TaskParameterSet: - """ - Convert dictionary-formatted Task parameters into a TaskParameterSet that can - be used in a Session. - If any parameters are missing from the dictionary, we will default to using - the first value for that parameter defined in the Step's parameter space. - """ - - # For each parameter defined in the Step, assert that it appears - # with the correct type in each parameter set provided by the user. - # We compound each error into a log message so that the user - # can fix as many as possible at once. - - defined_names = set(parameter_space.taskParameterDefinitions.keys()) - provided_names = set(parameter_values.keys()) - - # First, check for extraneous parameters - extra_names = provided_names.difference(defined_names) - for name in extra_names: - LOG.info( - msg=f"Skipping unused parameter '{name}'", extra={"session_id": self.session_id} - ) - - # The first value in the parameter space iterator will hold the default value - # we use for each missing parameter - default_set = StepParameterSpaceIterator(space=parameter_space)[0] - - parameter_set = TaskParameterSet() - for name in defined_names: - # Note that parameter sets don't verify types, so any errors resulting from - # type mismatches will be raised when the inner Session attempts to use them. - if name in parameter_values: - parameter_set.update( - { - name: ParameterValue( - type=ParameterValueType( - parameter_space.taskParameterDefinitions[name].type - ), - value=f"{parameter_values[name]}", - ) - } - ) - else: - parameter_set.update( - { - name: ParameterValue( - type=ParameterValueType( - parameter_space.taskParameterDefinitions[name].type - ), - value=default_set[name].value, - ) - } - ) - - return parameter_set - def _action_callback(self, session_id: str, new_status: ActionStatus) -> None: if new_status.state == ActionState.SUCCESS: if isinstance(self._current_action, RunTaskAction): - self.tasks_run += 1 + self.task_run_count += 1 self._action_ended.set() if new_status.state in (ActionState.FAILED, ActionState.CANCELED, ActionState.TIMEOUT): self.failed = True self._action_ended.set() - - def _add_environments(self, envs: list) -> list[str]: - ids: list[str] = [] - for env in envs: - self._enter_env_queue.put(EnterEnvironmentAction(self._inner_session, env, env.name)) - ids.append(env.name) - return ids diff --git a/src/openjd/cli/_run/_run_command.py b/src/openjd/cli/_run/_run_command.py index daca60d..7c3b891 100644 --- a/src/openjd/cli/_run/_run_command.py +++ b/src/openjd/cli/_run/_run_command.py @@ -4,15 +4,23 @@ from dataclasses import dataclass from pathlib import Path import json -from typing import Optional +from typing import Iterable, Optional import re import logging +import time -from ._local_session._session_manager import LocalSession, LogEntry +from ._local_session._session_manager import ( + LocalSession, + LocalSessionFailed, + LogEntry, + LoggingTimestampFormat, +) from .._common import ( + add_extensions_argument, OpenJDCliResult, generate_job, get_params_from_file, + process_extensions_argument, print_cli_result, read_environment_template, ) @@ -21,7 +29,11 @@ EnvironmentTemplate, Job, Step, + StepDependencyGraph, StepParameterSpaceIterator, + ParameterValue, + ParameterValueType, + TaskParameterSet, ) from openjd.sessions import PathMappingRule, LOG @@ -33,30 +45,30 @@ class OpenJDRunResult(OpenJDCliResult): """ job_name: str - step_name: str + step_name: Optional[str] duration: float - tasks_run: int + chunks_run: int logs: list[LogEntry] def __str__(self) -> str: + step_message = "" + if self.step_name is not None: + step_message = f"Step: {self.step_name}\n" return f""" --- Results of local session --- {self.message} Job: {self.job_name} -Step: {self.step_name} -Duration: {self.duration} seconds -Tasks run: {self.tasks_run} +{step_message}Duration: {self.duration} seconds +Chunks run: {self.chunks_run} """ def add_run_arguments(run_parser: ArgumentParser): run_parser.add_argument( "--step", - action="store", type=str, - required=True, metavar="STEP_NAME", help="The name of the Step in the Job to run Tasks from.", ) @@ -65,7 +77,6 @@ def add_run_arguments(run_parser: ArgumentParser): "--task-param", "-tp", action="append", - type=str, dest="task_params", metavar="PARAM=VALUE", help=( @@ -77,9 +88,6 @@ def add_run_arguments(run_parser: ArgumentParser): ) group.add_argument( "--tasks", - action="store", - type=str, - dest="tasks", metavar='file://tasks.json OR file://tasks.yaml OR [{"Param": "Value1", ...}, {"Param": "Value2", ...}]', help=( "This argument instructs the command to run one or more tasks/chunks of tasks for the Step in a Session. " @@ -89,7 +97,6 @@ def add_run_arguments(run_parser: ArgumentParser): ) group.add_argument( "--maximum-tasks", - action="store", type=int, default=-1, help=( @@ -100,14 +107,17 @@ def add_run_arguments(run_parser: ArgumentParser): ) run_parser.add_argument( "--run-dependencies", - action="store_const", - const=True, + action="store_true", help="Run the Step along with all of its transitive and direct dependencies.", ) + run_parser.add_argument( + "--no-run-dependencies", + action="store_false", + dest="run_dependencies", + help="Run the Step alone without dependencies.", + ) run_parser.add_argument( "--path-mapping-rules", - action="store", - type=str, help="The path mapping rules to apply to the template. Should be a path mapping definition according to " + "the 'pathmapping-1.0' schema. Can either be supplied as a string or as a path to a JSON/YAML document, " + "prefixed with 'file://'.", @@ -117,60 +127,54 @@ def add_run_arguments(run_parser: ArgumentParser): "--env", dest="environments", action="append", - type=str, metavar=" [] ...", help="Apply the given environments to the Session in the order given.", ) run_parser.add_argument( "--preserve", - action="store_const", - const=True, + action="store_true", default=False, help="Do not automatically delete the Session's Working Directory when complete.", ) run_parser.add_argument( "--verbose", - action="store_const", - const=True, + action="store_true", default=False, help="Enable verbose logging while running the Session.", ) + run_parser.add_argument( + "--timestamp-format", + choices=["relative", "local", "utc"], + default="relative", + help="How to format the log output timestamps when running the job.", + ) + add_extensions_argument(run_parser) -def _collect_required_steps(step_map: dict[str, Step], step: Step) -> list[Step]: +def _collect_dependency_steps(step_map: dict[str, Step], step: Step) -> list[Step]: """ Recursively traverses through a Step's dependencies to create an ordered list of - Steps to run in the local Session. + Steps to run in the local Session. Does not include the specified step. """ - if step.stepEnvironments: - # Currently, we only support running one local Session, so any Steps with Step-specific environments - # must not depend on/be a dependency for other Steps. - raise RuntimeError( - f"ERROR: Step '{step.name}' has Step-level environments and cannot be run in the same local Session as the other dependencies." - ) - if not step.dependencies: - return [step] - - required_steps: list[Step] = [] - - try: - for dep in step.dependencies: - dependency_name = dep.dependsOn - # Collect transitive dependencies in the recursive call, - # then remove duplicates - collected = _collect_required_steps(step_map, step_map[dependency_name]) - required_steps += [new_step for new_step in collected if new_step not in required_steps] - except KeyError: - # This should technically raise a validation error when creating a Job, - # but we check again here for thoroughness - raise RuntimeError( - f"ERROR: Dependency '{dependency_name}' in Step '{step.name}' is not an existing Step." - ) - - required_steps.append(step) + return [] + + dependency_steps: list[Step] = [] + visited_step_names: set[str] = set() + + for dep in step.dependencies: + dependency_name = dep.dependsOn + if dependency_name not in visited_step_names: + visited_step_names.add(dependency_name) + # Collect transitive dependencies in the recursive call, then filter any that were previously visited + transitive_deps = _collect_dependency_steps(step_map, step_map[dependency_name]) + dependency_steps.extend( + new_step for new_step in transitive_deps if new_step.name not in visited_step_names + ) + dependency_steps.append(step_map[dependency_name]) + visited_step_names.update(new_step.name for new_step in transitive_deps) - return required_steps + return dependency_steps def _process_task_params(arguments: list[str]) -> dict[str, str]: @@ -271,22 +275,15 @@ def _validate_task_params(step: Step, task_params: list[dict[str, str]]) -> None # For each task parameter set, verify: # 1) There are no parameters defined that don't exist in the template. # 2) That all parameters that are defined in the Step are defined in the parameter set. - # 3) [TODO] That the given parameter set is actually in the parameter space of the Step. - # - We need openjd.model.StepParameterSpaceIterator to have a membership test first to be able to do - # this last check. + # 3) That the given parameter set is actually in the parameter space of the Step. # Collect the names of all of the task parameters defined in the step. - if step.parameterSpace is not None: - param_space_iter = StepParameterSpaceIterator(space=step.parameterSpace) - task_parameter_names: set[str] = set(param_space_iter.names) - else: - task_parameter_names = set[str]() + param_space_iter = StepParameterSpaceIterator(space=step.parameterSpace) + task_parameter_names: set[str] = set(param_space_iter.names) error_list = list[str]() for i, parameter_set in enumerate(task_params): defined_params = set(parameter_set.keys()) - if defined_params == task_parameter_names: - continue extra_names = defined_params.difference(task_parameter_names) missing_names = task_parameter_names.difference(defined_params) if extra_names: @@ -297,6 +294,20 @@ def _validate_task_params(step: Step, task_params: list[dict[str, str]]) -> None error_list.append( f"Task {i} is missing values for parameters: {', '.join(sorted(missing_names))}" ) + if not (extra_names or missing_names): + params = { + name: ParameterValue( + type=ParameterValueType( + step.parameterSpace.taskParameterDefinitions[name].type # type: ignore + ), + value=parameter_set[name], + ) + for name in task_parameter_names + } + try: + param_space_iter.validate_containment(params) + except ValueError as e: + error_list.append(f"Task {i}: {e}") if error_list: error_msg = "Errors defining task parameter values:\n - " error_msg += "\n - ".join(error_list) @@ -306,13 +317,13 @@ def _validate_task_params(step: Step, task_params: list[dict[str, str]]) -> None def _run_local_session( *, job: Job, - step_map: dict[str, Step], - step: Step, + step_list: list[Step], + selected_step: Optional[Step], + timestamp_format: LoggingTimestampFormat, maximum_tasks: int = -1, - task_parameter_values: list[dict] = [], + task_parameter_values: Iterable[TaskParameterSet], environments: Optional[list[EnvironmentTemplate]] = None, path_mapping_rules: Optional[list[PathMappingRule]], - should_run_dependencies: bool = False, should_print_logs: bool = True, retain_working_dir: bool = False, ) -> OpenJDCliResult: @@ -320,56 +331,58 @@ def _run_local_session( Creates a Session object and listens for log messages to synchronously end the session. """ - dependencies: list[Step] = [] try: - if should_run_dependencies and step.dependencies: - # Raises: RuntimeError - dependencies = _collect_required_steps(step_map, step)[:-1] - except RuntimeError as rte: - return OpenJDCliResult(status="error", message=str(rte)) - - with LocalSession( - job=job, - session_id="sample_session", - path_mapping_rules=path_mapping_rules, - environments=environments, - should_print_logs=should_print_logs, - retain_working_dir=retain_working_dir, - ) as session: - session.initialize( - dependencies=dependencies, - step=step, - task_parameter_values=task_parameter_values, - maximum_tasks=maximum_tasks, - ) - session.run() - - # Monitor the local Session state - session.ended.wait() + start_seconds = time.perf_counter() + + step_name = "" + with LocalSession( + job=job, + timestamp_format=timestamp_format, + session_id="CLI-session", + path_mapping_rules=path_mapping_rules, + environments=[env.environment for env in environments] if environments else [], + should_print_logs=should_print_logs, + retain_working_dir=retain_working_dir, + ) as session: + for dep_step in step_list: + step_name = dep_step.name + session.run_step(dep_step) + if selected_step: + step_name = selected_step.name + session.run_step( + selected_step, + task_parameters=task_parameter_values, + maximum_tasks=maximum_tasks, + ) + duration = time.perf_counter() - start_seconds + except LocalSessionFailed: + duration = time.perf_counter() - start_seconds + session = None preserved_message: str = "" - if retain_working_dir: + if retain_working_dir and session is not None: preserved_message = ( - f"\nWorking directory preserved at: {str(session._inner_session.working_directory)}" + f"\nWorking directory preserved at: {str(session._openjd_session.working_directory)}" ) - if session.failed: + + if session is None or session.failed: return OpenJDRunResult( status="error", message="Session ended with errors; see Task logs for details" + preserved_message, job_name=job.name, - step_name=step.name, - duration=session.get_duration(), - tasks_run=session.tasks_run, - logs=session.get_log_messages(), + step_name=step_name, + duration=duration, + chunks_run=0 if session is None else session.task_run_count, + logs=[] if session is None else session.get_log_messages(), ) return OpenJDRunResult( status="success", message="Session ended successfully" + preserved_message, job_name=job.name, - step_name=step.name, - duration=session.get_duration(), - tasks_run=session.tasks_run, + step_name=selected_step.name if selected_step else None, + duration=duration, + chunks_run=session.task_run_count, logs=session.get_log_messages(), ) @@ -385,6 +398,8 @@ def do_run(args: Namespace) -> OpenJDCliResult: sets in sequence. """ + extensions = process_extensions_argument(args.extensions) + environments: list[EnvironmentTemplate] = [] if args.environments: for env in args.environments: @@ -404,17 +419,22 @@ def do_run(args: Namespace) -> OpenJDCliResult: parsed_rules = json.load(f) else: parsed_rules = json.loads(args.path_mapping_rules) + if not isinstance(parsed_rules, dict): + return OpenJDCliResult( + status="error", + message="Path mapping rules must be an object with 'version' and 'path_mapping_rules' fields", + ) if parsed_rules.get("version", None) != "pathmapping-1.0": return OpenJDCliResult( status="error", message="Path mapping rules must have a 'version' value of 'pathmapping-1.0'", ) - if not isinstance(parsed_rules.get("path_mapping_rules", None), list): + rules_list = parsed_rules.get("path_mapping_rules") + if not isinstance(rules_list, list): return OpenJDCliResult( status="error", message="Path mapping rules must contain a list named 'path_mapping_rules'", ) - rules_list = parsed_rules.get("path_mapping_rules") path_mapping_rules = [PathMappingRule.from_dict(rule) for rule in rules_list] if args.verbose: @@ -422,15 +442,29 @@ def do_run(args: Namespace) -> OpenJDCliResult: try: # Raises: RuntimeError - the_job = generate_job(args) + the_job = generate_job(args, supported_extensions=extensions) # Map Step names to Step objects so they can be easily accessed step_map = {step.name: step for step in the_job.steps} - if args.step not in step_map: - raise RuntimeError( - f"No Step with name '{args.step}' is defined in the given Job Template." - ) + if args.step is not None: + # If a step name was provided + selected_step = step_map.get(args.step) + if selected_step is None: + raise RuntimeError( + f"No Step with name '{args.step}' is defined in the given Job Template." + ) + else: + if len(the_job.steps) == 1: + # If the job has only one step, act as if its name was provided + selected_step = the_job.steps[0] + else: + selected_step = None + if args.task_params or args.tasks: + raise RuntimeError( + "Providing task parameters requires a specified step or a job with a single step.\n" + + f"{len(the_job.steps)} steps: {[step.name for step in the_job.steps]}." + ) task_params: list[dict[str, str]] = [] if args.task_params: @@ -438,20 +472,50 @@ def do_run(args: Namespace) -> OpenJDCliResult: elif args.tasks: task_params = _process_tasks(args.tasks) - _validate_task_params(step_map[args.step], task_params) + if selected_step and task_params: + _validate_task_params(selected_step, task_params) + + task_parameter_values: Iterable[TaskParameterSet] = [ + { + name: ParameterValue( + type=ParameterValueType( + selected_step.parameterSpace.taskParameterDefinitions[name].type # type: ignore + ), + value=value, + ) + for name, value in params.items() + } + for params in task_params + ] + elif selected_step: + task_parameter_values = StepParameterSpaceIterator(space=selected_step.parameterSpace) + else: + task_parameter_values = [] + + except RuntimeError as rte: + return OpenJDCliResult(status="error", message=str(rte)) + step_list: list[Step] = [] + try: + if selected_step is None: + # If no step was selected, topologically sort and run all the steps + step_graph = StepDependencyGraph(job=the_job) + step_list = step_graph.topo_sorted() + elif args.run_dependencies and selected_step.dependencies: + # Collect the dependencies of the selected step + step_list = _collect_dependency_steps(step_map, selected_step) except RuntimeError as rte: return OpenJDCliResult(status="error", message=str(rte)) return _run_local_session( job=the_job, - step_map=step_map, - step=step_map[args.step], - task_parameter_values=task_params, + step_list=step_list, + selected_step=selected_step, + task_parameter_values=task_parameter_values, + timestamp_format=LoggingTimestampFormat(args.timestamp_format), maximum_tasks=args.maximum_tasks, environments=environments, path_mapping_rules=path_mapping_rules, - should_run_dependencies=(args.run_dependencies), should_print_logs=(args.output == "human-readable"), retain_working_dir=args.preserve, ) diff --git a/src/openjd/cli/_summary/_summary_command.py b/src/openjd/cli/_summary/_summary_command.py index dd00721..227c81b 100644 --- a/src/openjd/cli/_summary/_summary_command.py +++ b/src/openjd/cli/_summary/_summary_command.py @@ -4,8 +4,10 @@ from ._summary_output import output_summary_result from .._common import ( + add_extensions_argument, OpenJDCliResult, generate_job, + process_extensions_argument, print_cli_result, ) @@ -21,6 +23,7 @@ def add_summary_arguments(summary_parser: ArgumentParser) -> None: metavar="STEP_NAME", help="Prints information about the Step with this name within the Job Template.", ) + add_extensions_argument(summary_parser) @print_cli_result @@ -28,9 +31,11 @@ def do_summary(args: Namespace) -> OpenJDCliResult: """ Given a Job Template and applicable parameters, generates a Job and outputs information about it. """ + extensions = process_extensions_argument(args.extensions) + try: # Raises: RuntimeError - sample_job = generate_job(args) + sample_job = generate_job(args, supported_extensions=extensions) except RuntimeError as rte: return OpenJDCliResult(status="error", message=str(rte)) diff --git a/src/openjd/cli/_summary/_summary_output.py b/src/openjd/cli/_summary/_summary_output.py index 125f3f1..2092fc9 100644 --- a/src/openjd/cli/_summary/_summary_output.py +++ b/src/openjd/cli/_summary/_summary_output.py @@ -26,7 +26,7 @@ def _format_summary_list(data: list, padding: int = 0) -> str: """ formatted_list: str = "" for item in data: - formatted_list += "\t" * padding + f"- {str(item)}\n" + formatted_list += " " * padding + f"- {str(item)}\n" return formatted_list @@ -64,7 +64,7 @@ class EnvironmentSummary: def __str__(self) -> str: readable_string = f"{self.name} (from '{self.parent}')" if self.description: - readable_string += f"\n\t{self.description}" + readable_string += f"\n {self.description}" return readable_string @@ -96,18 +96,18 @@ class StepSummary: dependencies: Optional[list[DependencySummary]] def __str__(self) -> str: - summary_str = f"'{self.name}'\n" + summary_str = f"'{self.name}' ({self.total_tasks} total Tasks)\n" if self.parameter_definitions: - summary_str += f"\t{len(self.parameter_definitions)} Task parameter(s)\n" - - summary_str += f"\t{self.total_tasks} total Tasks\n" + summary_str += ( + f" Task parameters:\n{_format_summary_list(self.parameter_definitions, padding=4)}" + ) if self.environments: - summary_str += f"\t{len(self.environments)} environments\n" + summary_str += f" {len(self.environments)} environments\n" if self.dependencies: - summary_str += f"\t{len(self.dependencies)} dependencies\n" + summary_str += f" {len(self.dependencies)} dependencies\n" return summary_str @@ -132,7 +132,7 @@ def __str__(self) -> str: # For each parameter, print its name and its value (may be default or user-provided) if self.parameter_definitions: summary_str += ( - f"\nParameters:\n{_format_summary_list(self.parameter_definitions, padding=1)}" + f"\nParameters:\n{_format_summary_list(self.parameter_definitions, padding=2)}" ) summary_str += f""" @@ -148,11 +148,11 @@ def __str__(self) -> str: if self.total_environments: summary_str += f"\n--- Environments in '{self.name}' ---\n" if self.root_environments: - summary_str += _format_summary_list(self.root_environments) + summary_str += _format_summary_list(self.root_environments, padding=2) for step in self.steps: if step.environments: - summary_str += _format_summary_list(step.environments) + summary_str += _format_summary_list(step.environments, padding=2) return summary_str @@ -209,10 +209,12 @@ def _get_step_summary(step: Step) -> StepSummary: parameter_definitions = _populate_summary_list( [(name, param) for name, param in step.parameterSpace.taskParameterDefinitions.items()], lambda param_tuple: ParameterSummary( - name=param_tuple[0], description=None, type=param_tuple[1].type.name, value=None + name=param_tuple[0], description=None, type=param_tuple[1].type.value, value=None ), ) - total_tasks = len(StepParameterSpaceIterator(space=step.parameterSpace)) + total_tasks = len( + StepParameterSpaceIterator(space=step.parameterSpace, chunks_task_count_override=1) + ) environments = [] if step.stepEnvironments: diff --git a/test/openjd/cli/__init__.py b/test/openjd/cli/__init__.py index a8eaba8..913b47b 100644 --- a/test/openjd/cli/__init__.py +++ b/test/openjd/cli/__init__.py @@ -2,122 +2,42 @@ import pytest from enum import Enum +from typing import Any +from pathlib import Path + +import yaml + +from openjd.model import ParameterValue, ParameterValueType +from openjd.cli import main as openjd_cli_main + + +def run_openjd_cli_main(capsys, *, args: list[str], expected_exit_code: int) -> Any: + """Wraps the logic to run the OpenJD CLI within a capsys environment""" + try: + openjd_cli_main(args) + exit_code = 0 + except SystemExit as e: + exit_code = e.code # type: ignore + + outerr = capsys.readouterr() + + assert ( + exit_code == expected_exit_code + ), f"Expected exit code {expected_exit_code}, but got {exit_code}:\n{format_capsys_outerr(outerr)}" + + return outerr + + +def format_capsys_outerr(outerr: Any) -> str: + """Formats the capsys stdout and stderr to insert in an assertion message""" + return f"\nstdout:\n{outerr.out}\nstderr:\n{outerr.err}" + # Catch-all sample template with different cases per Step -MOCK_TEMPLATE = { - "specificationVersion": "jobtemplate-2023-09", - "name": "my-job", - "parameterDefinitions": [{"name": "Message", "type": "STRING", "default": "Hello, world!"}], - "jobEnvironments": [{"name": "rootEnv", "variables": {"rootVar": "rootVal"}}], - "steps": [ - # VALID STEPS - { - # Basic step; uses Job parameters and has an environment - "name": "NormalStep", - "script": { - "actions": { - "onRun": {"command": "python", "args": ["-c", "print('{{Param.Message}}')"]} - } - }, - "stepEnvironments": [ - { - "name": "env1", - "script": { - "actions": { - "onEnter": {"command": "python", "args": ["-c", "print('EnteringEnv')"]} - } - }, - } - ], - }, - { - # Step that will wait for one minute before completing its Task - "name": "LongCommand", - "script": {"actions": {"onRun": {"command": "sleep", "args": ["60"]}}}, - }, - { - # Step with the bare minimum information, i.e., no Task parameters, environments, or dependencies - "name": "BareStep", - "script": {"actions": {"onRun": {"command": "python", "args": ["-c", "print('zzz')"]}}}, - }, - { - # Step with a direct dependency on a previous Step - "name": "DependentStep", - "script": { - "actions": { - "onRun": {"command": "python", "args": ["-c", "print('I am dependent!')"]} - } - }, - "dependencies": [{"dependsOn": "BareStep"}], - }, - { - # Step with Task parameters - "name": "TaskParamStep", - "parameterSpace": { - "taskParameterDefinitions": [ - {"name": "TaskNumber", "type": "INT", "range": [1, 2, 3]}, - {"name": "TaskMessage", "type": "STRING", "range": ["Hi!", "Bye!"]}, - ] - }, - "script": { - "actions": { - "onRun": { - "command": "python", - "args": [ - "-c", - "print('{{Task.Param.TaskNumber}}.{{Task.Param.TaskMessage}}')", - ], - } - } - }, - }, - { - # Step with a transitive dependency and a direct dependency - "name": "ExtraDependentStep", - "script": { - "actions": { - "onRun": {"command": "python", "args": ["-c", "print('I am extra dependent!')"]} - } - }, - "dependencies": [{"dependsOn": "DependentStep"}, {"dependsOn": "TaskParamStep"}], - }, - { - # Step with dependencies and Task parameters - "name": "DependentParamStep", - "parameterSpace": { - "taskParameterDefinitions": [ - {"name": "Adjective", "type": "STRING", "range": ["really", "very", "super"]} - ], - }, - "script": { - "actions": { - "onRun": { - "command": "python", - "args": ["-c", "print('I am {{Task.Param.Adjective}} dependent!')"], - } - } - }, - "dependencies": [{"dependsOn": "TaskParamStep"}], - }, - # ERROR STEPS - { - # Step with a non-existent command that will throw an error when run - "name": "BadCommand", - "script": {"actions": {"onRun": {"command": "aaaaaaaaaa"}}}, - }, - { - # Step with a dependency it can't run in the same Session with - "name": "ShouldSeparateSession", - "script": { - "actions": { - "onRun": {"command": "python", "args": ["-c", "print('I do not belong here!')"]} - } - }, - "dependencies": [{"dependsOn": "NormalStep"}], - }, - ], -} +MOCK_TEMPLATE = yaml.safe_load( + (Path(__file__).parent / "templates" / "job_with_test_steps.yaml").read_text(encoding="utf8") +) # Map of Step names to Step indices for more readable test cases @@ -130,8 +50,8 @@ class SampleSteps(int, Enum): TaskParamStep = 4 ExtraDependentStep = 5 DependentParamStep = 6 - BadCommand = 7 - ShouldSeparateSession = 8 + StepDepHasStepEnv = 7 + BadCommand = 8 # Sample dictionaries for tests using Job parameters @@ -176,85 +96,87 @@ class SampleSteps(int, Enum): # Shared parameters for `LocalSession` tests to be used with `@pytest.mark.parametrize` SESSION_PARAMETERS = ( - "dependency_indexes,step_index,maximum_tasks, parameter_sets,num_expected_environments,num_expected_tasks", + "step_index,maximum_tasks, parameter_sets,num_expected_tasks", [ - pytest.param([], SampleSteps.NormalStep, -1, None, 2, 1, id="Basic step"), + pytest.param(SampleSteps.NormalStep, -1, None, 1, id="Basic step"), pytest.param( - [SampleSteps.BareStep], SampleSteps.DependentStep, -1, None, 1, - 2, id="Direct dependency", ), pytest.param( - [ - SampleSteps.BareStep, - SampleSteps.DependentStep, - SampleSteps.TaskParamStep, - ], SampleSteps.ExtraDependentStep, -1, None, 1, - 9, id="Dependencies and Task parameters", ), pytest.param( - [SampleSteps.BareStep, SampleSteps.DependentStep, SampleSteps.TaskParamStep], SampleSteps.ExtraDependentStep, 1, None, 1, - 9, id="No maximum on dependencies' Tasks", ), + pytest.param(SampleSteps.TaskParamStep, 1, None, 1, id="Limit on maximum Task parameters"), pytest.param( - [], SampleSteps.TaskParamStep, 1, None, 1, 1, id="Limit on maximum Task parameters" - ), - pytest.param( - [], SampleSteps.TaskParamStep, 100, None, - 1, 6, id="Maximum Task parameters more than defined", ), pytest.param( - [], SampleSteps.TaskParamStep, -1, - [{"TaskNumber": 5}, {"TaskMessage": "Hello!"}], - 1, + [ + { + "TaskNumber": ParameterValue(type=ParameterValueType.INT, value="2"), + "TaskMessage": ParameterValue(type=ParameterValueType.STRING, value="Bye!"), + }, + { + "TaskNumber": ParameterValue(type=ParameterValueType.INT, value="1"), + "TaskMessage": ParameterValue(type=ParameterValueType.STRING, value="Hi!"), + }, + ], 2, id="Custom parameter sets", ), pytest.param( - [], SampleSteps.BareStep, -1, - [{"Why": "Am"}, {"I": "Here"}], - 1, - 1, + [ + {"Why": ParameterValue(type=ParameterValueType.STRING, value="Am")}, + {"I": ParameterValue(type=ParameterValueType.STRING, value="Here")}, + ], + 2, id="Task parameters for step not requiring them", ), pytest.param( - [SampleSteps.TaskParamStep], SampleSteps.DependentParamStep, -1, - [{"Adjective": "extremely"}, {"Adjective": "most"}], - 1, - 8, + [ + {"Adjective": ParameterValue(type=ParameterValueType.STRING, value="extremely")}, + {"Adjective": ParameterValue(type=ParameterValueType.STRING, value="most")}, + ], + 2, id="Custom Task parameters not applied to dependency", ), pytest.param( - [], SampleSteps.TaskParamStep, 1, - [{"TaskMessage": "Hello!"}, {"TaskMessage": "Extra message!"}], - 1, + [ + { + "TaskNumber": ParameterValue(type=ParameterValueType.INT, value="2"), + "TaskMessage": ParameterValue(type=ParameterValueType.STRING, value="Hi!"), + }, + { + "TaskNumber": ParameterValue(type=ParameterValueType.INT, value="2"), + "TaskMessage": ParameterValue(type=ParameterValueType.STRING, value="Bye!"), + }, + ], 1, id="Maximum Tasks less than number of parameter sets", ), diff --git a/test/openjd/cli/templates/basic.yaml b/test/openjd/cli/templates/basic.yaml new file mode 100644 index 0000000..706f0a0 --- /dev/null +++ b/test/openjd/cli/templates/basic.yaml @@ -0,0 +1,51 @@ +specificationVersion: "jobtemplate-2023-09" +name: Job +parameterDefinitions: + - name: J + type: STRING +jobEnvironments: + - name: J1 + script: + actions: + onEnter: + command: python + args: ["-c", "print('J1 Enter')"] + onExit: + command: python + args: ["-c", "print('J1 Exit')"] + - name: J2 + script: + actions: + onEnter: + command: python + args: ["-c", "print('J2 Enter')"] + onExit: + command: python + args: ["-c", "print('J2 Exit')"] +steps: + - name: First + parameterSpace: + taskParameterDefinitions: + - name: Foo + type: INT + range: "1" + - name: Bar + type: STRING + range: ["Bar1", "Bar2"] + stepEnvironments: + - name: FirstS, + script: + actions: + onEnter: + command: python + args: ["-c", "print('FirstS Enter')"] + onExit: + command: python + args: ["-c", "print('FirstS Exit')"] + script: + actions: + onRun: + command: python + args: + - "-c" + - "print('J={{Param.J}} Foo={{Task.Param.Foo}}. Bar={{Task.Param.Bar}}')" diff --git a/test/openjd/cli/templates/basic_dependency_job.yaml b/test/openjd/cli/templates/basic_dependency_job.yaml new file mode 100644 index 0000000..144fbac --- /dev/null +++ b/test/openjd/cli/templates/basic_dependency_job.yaml @@ -0,0 +1,53 @@ +specificationVersion: jobtemplate-2023-09 +name: Job +parameterDefinitions: +- name: J + type: STRING +jobEnvironments: +- name: J1 + script: + actions: + onEnter: + command: python + args: + - -c + - print('J1 Enter') + onExit: + command: python + args: + - -c + - print('J1 Exit') +steps: +- name: First + parameterSpace: + taskParameterDefinitions: + - name: Foo + type: INT + range: '1' + - name: Bar + type: STRING + range: + - Bar1 + - Bar2 + script: + actions: + onRun: + command: python + args: + - -c + - print('J={{Param.J}} Foo={{Task.Param.Foo}}. Bar={{Task.Param.Bar}}') +- name: Second + dependencies: + - dependsOn: First + parameterSpace: + taskParameterDefinitions: + - name: Fuz + type: INT + range: 1-2 + script: + actions: + onRun: + command: python + args: + - -c + - print('J={{Param.J}} Fuz={{Task.Param.Fuz}}.') \ No newline at end of file diff --git a/test/openjd/cli/templates/chunked_job.yaml b/test/openjd/cli/templates/chunked_job.yaml new file mode 100644 index 0000000..d382546 --- /dev/null +++ b/test/openjd/cli/templates/chunked_job.yaml @@ -0,0 +1,79 @@ +specificationVersion: 'jobtemplate-2023-09' +extensions: + - TASK_CHUNKING +name: Chunked Job + +parameterDefinitions: +- name: Items + type: STRING + default: 1-40 +- name: ChunkSize + type: INT + default: 10 +- name: TargetRuntime + type: INT + default: 0 +- name: TaskSleepTime + type: FLOAT + default: 0.01 + +steps: +- name: Chunked Step + + parameterSpace: + taskParameterDefinitions: + - name: Item + type: CHUNK[INT] + range: "{{Param.Items}}" + chunks: + defaultTaskCount: "{{Param.ChunkSize}}" + targetRuntimeSeconds: "{{Param.TargetRuntime}}" + rangeConstraint: NONCONTIGUOUS + + script: + actions: + onRun: + command: bash + args: ['{{Task.File.Run}}'] + embeddedFiles: + - name: GetSleepTime + type: TEXT + filename: get_sleep_time.py + data: | + """ + Converts an Open Job Description range expression into a sleep time according to the job parameters. + * https://github.com/OpenJobDescription/openjd-specifications/wiki/2023-09-Template-Schemas#34111-intrangeexpr + """ + + import sys, re + + task_sleep_time = float(sys.argv[2]) + + def range_expr_to_list(range_expr): + # Regex that matches "", "-", or "-:" + int_pat = r"\s*(-?[0-9]+)\s*" + part_re = re.compile(f"^{int_pat}(?:-{int_pat}(?::{int_pat})?)?$") + result = [] + for part in range_expr.split(","): + if m := part_re.match(part): + start, end, step = m.groups() + if step is not None: + # Linear sequence "3-7:2" means the values [3, 5, 7]. + result.extend(range(int(start), int(end) + (int(step)//abs(int(step))), int(step))) + elif end is not None: + # Interval "3-6" means the values [3, 4, 5, 6]. + result.extend(range(int(start), int(end) + 1)) + else: + # Integer "3" means the values [3]. + result.append(int(start)) + else: + raise ValueError(f"Invalid frame range expression: {range_expr}") + return result + + print(task_sleep_time * len(range_expr_to_list(sys.argv[1]))) + - name: Run + type: TEXT + data: | + set -xeuo pipefail + + sleep "$(python '{{Task.File.GetSleepTime}}' '{{Task.Param.Item}}' '{{Param.TaskSleepTime}}')" diff --git a/test/openjd/cli/templates/env_1.yaml b/test/openjd/cli/templates/env_1.yaml new file mode 100644 index 0000000..94db703 --- /dev/null +++ b/test/openjd/cli/templates/env_1.yaml @@ -0,0 +1,15 @@ +specificationVersion: environment-2023-09 +environment: + name: Env1 + script: + actions: + onEnter: + command: python + args: + - -c + - print('Env1 Enter') + onExit: + command: python + args: + - -c + - print('Env1 Exit') diff --git a/test/openjd/cli/templates/env_2.yaml b/test/openjd/cli/templates/env_2.yaml new file mode 100644 index 0000000..975a40a --- /dev/null +++ b/test/openjd/cli/templates/env_2.yaml @@ -0,0 +1,15 @@ +specificationVersion: environment-2023-09 +environment: + name: Env2 + script: + actions: + onEnter: + command: python + args: + - -c + - print('Env2 Enter') + onExit: + command: python + args: + - -c + - print('Env2 Exit') diff --git a/test/openjd/cli/templates/env_fails_enter.yaml b/test/openjd/cli/templates/env_fails_enter.yaml new file mode 100644 index 0000000..e233a20 --- /dev/null +++ b/test/openjd/cli/templates/env_fails_enter.yaml @@ -0,0 +1,15 @@ +specificationVersion: environment-2023-09 +environment: + name: EnvEnterFail + script: + actions: + onEnter: + command: python + args: + - -c + - import sys; print('EnvEnterFail Enter'); sys.exit(1) + onExit: + command: python + args: + - -c + - print('EnvEnterFail Exit') diff --git a/test/openjd/cli/templates/env_fails_exit.yaml b/test/openjd/cli/templates/env_fails_exit.yaml new file mode 100644 index 0000000..47bc416 --- /dev/null +++ b/test/openjd/cli/templates/env_fails_exit.yaml @@ -0,0 +1,15 @@ +specificationVersion: environment-2023-09 +environment: + name: EnvExitFail + script: + actions: + onEnter: + command: python + args: + - -c + - print('EnvExitFail Enter') + onExit: + command: python + args: + - -c + - import sys; print('EnvExitFail Exit'); sys.exit(1) diff --git a/test/openjd/cli/templates/job_sleep_exit_normal.yaml b/test/openjd/cli/templates/job_sleep_exit_normal.yaml new file mode 100644 index 0000000..f0f5e2f --- /dev/null +++ b/test/openjd/cli/templates/job_sleep_exit_normal.yaml @@ -0,0 +1,23 @@ +{ + "specificationVersion": "jobtemplate-2023-09", + "name": "TimeoutTest", + "parameterDefinitions": [{"name": "J", "type": "STRING"}], + "steps": [ + { + "name": "Timeout", + "script": { + "actions": { + "onRun": { + "command": "python", + "args": [ + "-c", + # Obfuscate "EXIT_NORMAL" so it doesn't appear in the log when Windows prints the command that's run to the log. + "import time,sys; print('SLEEP'); sys.stdout.flush(); time.sleep(5); print(chr(69)+'XIT_NORMAL')", + ], + "timeout": 2, + } + } + }, + } + ], + } \ No newline at end of file diff --git a/test/openjd/cli/templates/job_with_test_steps.yaml b/test/openjd/cli/templates/job_with_test_steps.yaml new file mode 100644 index 0000000..cecf418 --- /dev/null +++ b/test/openjd/cli/templates/job_with_test_steps.yaml @@ -0,0 +1,130 @@ +# Catch-all sample template with different cases per Step +specificationVersion: jobtemplate-2023-09 +name: my-job +parameterDefinitions: +- name: Message + type: STRING + default: Hello, world! +jobEnvironments: +- name: rootEnv + variables: + rootVar: rootVal +steps: +# VALID STEPS +# Basic step; uses Job parameters and has an environment +- name: NormalStep + script: + actions: + onRun: + command: python + args: + - -c + - print('{{Param.Message}}') + stepEnvironments: + - name: env1 + script: + actions: + onEnter: + command: python + args: + - -c + - print('EnteringEnv') +# Step that will wait for one minute before completing its Task +- name: LongCommand + script: + actions: + onRun: + command: sleep + args: + - '60' +# Step with the bare minimum information, i.e., no Task parameters, environments, or dependencies +- name: BareStep + script: + actions: + onRun: + command: python + args: + - -c + - print('zzz') +# Step with a direct dependency on a previous Step +- name: DependentStep + script: + actions: + onRun: + command: python + args: + - -c + - print('I am dependent!') + dependencies: + - dependsOn: BareStep +# Step with Task parameters +- name: TaskParamStep + parameterSpace: + taskParameterDefinitions: + - name: TaskNumber + type: INT + range: + - 1 + - 2 + - 3 + - name: TaskMessage + type: STRING + range: + - Hi! + - Bye! + script: + actions: + onRun: + command: python + args: + - -c + - print('{{Task.Param.TaskNumber}}.{{Task.Param.TaskMessage}}') +# Step with a transitive dependency and a direct dependency +- name: ExtraDependentStep + script: + actions: + onRun: + command: python + args: + - -c + - print('I am extra dependent!') + dependencies: + - dependsOn: DependentStep + - dependsOn: TaskParamStep +# Step with dependencies and Task parameters +- name: DependentParamStep + parameterSpace: + taskParameterDefinitions: + - name: Adjective + type: STRING + range: + - really + - very + - super + script: + actions: + onRun: + command: python + args: + - -c + - print('I am {{Task.Param.Adjective}} dependent!') + dependencies: + - dependsOn: TaskParamStep +# Step whose dependency has a step environment +- name: StepDepHasStepEnv + script: + actions: + onRun: + command: python + args: + - -c + - print('I have a dependency with a step environment!') + dependencies: + - dependsOn: NormalStep +# ERROR STEPS +# Step with a non-existent command that will throw an error when run +- name: BadCommand + script: + actions: + onRun: + command: aaaaaaaaaa diff --git a/test/openjd/cli/templates/simple_with_j_param.yaml b/test/openjd/cli/templates/simple_with_j_param.yaml new file mode 100644 index 0000000..a5ce128 --- /dev/null +++ b/test/openjd/cli/templates/simple_with_j_param.yaml @@ -0,0 +1,18 @@ +{ + "specificationVersion": "jobtemplate-2023-09", + "name": "Test", + "parameterDefinitions": [{"name": "J", "type": "STRING"}], + "steps": [ + { + "name": "SimpleStep", + "script": { + "actions": { + "onRun": { + "command": "python", + "args": ["-c", "print('DoTask')"], + } + } + }, + } + ], +} \ No newline at end of file diff --git a/test/openjd/cli/templates/simple_with_j_param_exit_1.yaml b/test/openjd/cli/templates/simple_with_j_param_exit_1.yaml new file mode 100644 index 0000000..73fbc15 --- /dev/null +++ b/test/openjd/cli/templates/simple_with_j_param_exit_1.yaml @@ -0,0 +1,18 @@ +{ + "specificationVersion": "jobtemplate-2023-09", + "name": "Test", + "parameterDefinitions": [{"name": "J", "type": "STRING"}], + "steps": [ + { + "name": "SimpleStep", + "script": { + "actions": { + "onRun": { + "command": "python", + "args": ["-c", "import sys; print('DoTask'); sys.exit(1)"] + } + } + } + } + ] +} diff --git a/test/openjd/cli/test_check_command.py b/test/openjd/cli/test_check_command.py index 1c93fc0..4f9ce73 100644 --- a/test/openjd/cli/test_check_command.py +++ b/test/openjd/cli/test_check_command.py @@ -28,7 +28,7 @@ def test_do_check_file_success(tempfile_extension: str, doc_serializer: Callable ) as temp_template: doc_serializer(MOCK_TEMPLATE, temp_template.file) - mock_args = Namespace(path=Path(temp_template.name), output="human-readable") + mock_args = Namespace(path=Path(temp_template.name), output="human-readable", extensions="") do_check(mock_args) Path(temp_template.name).unlink() @@ -41,7 +41,7 @@ def test_do_check_file_error(): in this case we just test an incorrect filename that gets handled in read_template) """ - mock_args = Namespace(path=Path("error-file.json"), output="human-readable") + mock_args = Namespace(path=Path("error-file.json"), output="human-readable", extensions="") with pytest.raises(SystemExit): do_check(mock_args) @@ -51,6 +51,6 @@ def test_do_check_bundle_error(): Test that passing a bundle with no template file yields a SystemError """ with tempfile.TemporaryDirectory() as temp_bundle: - mock_args = Namespace(path=Path(temp_bundle), output="human-readable") + mock_args = Namespace(path=Path(temp_bundle), output="human-readable", extensions="") with pytest.raises(SystemExit): do_check(mock_args) diff --git a/test/openjd/cli/test_chunked_job.py b/test/openjd/cli/test_chunked_job.py new file mode 100644 index 0000000..49b4aae --- /dev/null +++ b/test/openjd/cli/test_chunked_job.py @@ -0,0 +1,192 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + +from pathlib import Path +import re + +import pytest + +from . import run_openjd_cli_main, format_capsys_outerr + +CHUNKED_JOB_TEMPLATE_FILE = str(Path(__file__).parent / "templates" / "chunked_job.yaml") + + +@pytest.mark.parametrize( + "cli_options, expected_exit_code, expected_message_regex_list", + [ + ([], 0, [r"Template at '.*[\\/]chunked_job.yaml' passes validation checks."]), + (["--extensions", ""], 1, ["Unsupported extension names: TASK_CHUNKING"]), + ( + ["--extensions", "TASK_CHUNKING"], + 0, + [r"Template at '.*[\\/]chunked_job.yaml' passes validation checks."], + ), + ], +) +def test_openjd_check_on_chunked_job( + capsys, cli_options: list[str], expected_exit_code: int, expected_message_regex_list: list[str] +) -> None: + # Test that "openjd check" validates the chunked_job.yaml appropriately + + outerr = run_openjd_cli_main( + capsys, + args=["check", CHUNKED_JOB_TEMPLATE_FILE, *cli_options], + expected_exit_code=expected_exit_code, + ) + + for expected_message_regex in expected_message_regex_list: + assert re.search( + expected_message_regex, outerr.out + ), f"Regex r'{expected_message_regex}' not matched in:\n{format_capsys_outerr(outerr)}" + + +def test_openjd_summary_on_chunked_job(capsys): + # Test that "openjd summary" prints out correct information for chunked_job.yaml + + expected_message_regex_list = [ + "Summary for 'Chunked Job'", + "Total tasks: 40", + r"1. 'Chunked Step' \(40 total Tasks\)", + r"Item \(CHUNK\[INT\]\)", + ] + + outerr = run_openjd_cli_main( + capsys, args=["summary", CHUNKED_JOB_TEMPLATE_FILE], expected_exit_code=0 + ) + + for expected_message_regex in expected_message_regex_list: + assert re.search( + expected_message_regex, outerr.out, re.MULTILINE + ), f"Regex r'{expected_message_regex}' not matched in:\n{format_capsys_outerr(outerr)}" + + +def test_openjd_run_on_chunked_job_default_options(capsys): + # Test that "openjd run" runs the chunked_job.yaml with the expected chunks + + expected_message_regex_list = [ + r"Item\(CHUNK\[INT\]\) = 1-10$", + r"Item\(CHUNK\[INT\]\) = 11-20$", + r"Item\(CHUNK\[INT\]\) = 21-30$", + r"Item\(CHUNK\[INT\]\) = 31-40$", + "Chunks run: 4$", + ] + + outerr = run_openjd_cli_main( + capsys, + args=["run", CHUNKED_JOB_TEMPLATE_FILE, "--step", "Chunked Step"], + expected_exit_code=0, + ) + + for expected_message_regex in expected_message_regex_list: + assert re.search( + expected_message_regex, outerr.out, re.MULTILINE + ), f"Regex r'{expected_message_regex}' not matched in:\n{format_capsys_outerr(outerr)}" + + +def test_openjd_run_on_chunked_job_adaptive_chunking(capsys): + # Test that running chunked_job.yaml with adaptive chunking and the TargetRuntime cranked really high + # resutls in two chunks, the first being one task, and the second being the remainder. + + expected_message_regex_list = [ + r"Item\(CHUNK\[INT\]\) = 1$", + r"Item\(CHUNK\[INT\]\) = 2-40$", + "Chunks run: 2$", + ] + + outerr = run_openjd_cli_main( + capsys, + args=[ + "run", + CHUNKED_JOB_TEMPLATE_FILE, + "--step", + "Chunked Step", + "-p", + "ChunkSize=1", + "-p", + "TargetRuntime=10000", + ], + expected_exit_code=0, + ) + + for expected_message_regex in expected_message_regex_list: + assert re.search( + expected_message_regex, outerr.out, re.MULTILINE + ), f"Regex r'{expected_message_regex}' not matched in:\n{format_capsys_outerr(outerr)}" + + +@pytest.mark.parametrize("target_runtime", [0, 1]) +def test_openjd_run_on_chunked_job_maximum_task_count(capsys, target_runtime): + # Test that running chunked_job.yaml with small chunks and a maximum task count will run the max count + # Runs with TargetRuntime=0 (fixed chunk size) and TargetRuntime=1 (adaptive chunk size) to + # exercise both task running inner loops. + expected_message_regex_list = [ + "Chunks run: 5$", + ] + + outerr = run_openjd_cli_main( + capsys, + args=[ + "run", + CHUNKED_JOB_TEMPLATE_FILE, + "--step", + "Chunked Step", + "-p", + "ChunkSize=3", + "-p", + f"TargetRuntime={target_runtime}", + "--maximum-tasks", + "5", + ], + expected_exit_code=0, + ) + + for expected_message_regex in expected_message_regex_list: + assert re.search( + expected_message_regex, outerr.out, re.MULTILINE + ), f"Regex r'{expected_message_regex}' not matched in:\n{format_capsys_outerr(outerr)}" + + +PARAMETRIZE_CASES: tuple = ( + pytest.param( + ["-tp", "Item=0"], + [ + r"Parameter Item of type CHUNK\[INT\] value 0 is not a subset of the range in the parameter space." + ], + id="Item single value out of range", + ), + pytest.param( + ["-tp", "Item=1;2"], + [r"Parameter Item of type CHUNK\[INT\] value 1;2 is not a valid range expression"], + id="Item is not a range expr", + ), + pytest.param( + ["-tp", "Item=30-41"], + [ + r"Parameter Item of type CHUNK\[INT\] value 30-41 is not a subset of the range in the parameter space." + ], + id="Item interval out of range", + ), +) + + +@pytest.mark.parametrize("bad_task_params,expected_message_regex_list", PARAMETRIZE_CASES) +def test_openjd_run_on_chunked_job_bad_task_params( + capsys, bad_task_params, expected_message_regex_list +): + # Test that running chunked_job.yaml with various bad task parameters fails with expected messages + + outerr = run_openjd_cli_main( + capsys, + args=[ + "run", + CHUNKED_JOB_TEMPLATE_FILE, + "--step", + "Chunked Step", + *bad_task_params, + ], + expected_exit_code=1, + ) + + for expected_message_regex in expected_message_regex_list: + assert re.search( + expected_message_regex, outerr.out, re.MULTILINE + ), f"Regex r'{expected_message_regex}' not matched in:\n{format_capsys_outerr(outerr)}" diff --git a/test/openjd/cli/test_common.py b/test/openjd/cli/test_common.py index 01bd647..969a60a 100644 --- a/test/openjd/cli/test_common.py +++ b/test/openjd/cli/test_common.py @@ -132,7 +132,7 @@ def test_read_job_template_parsingerror(tempfile_extension: str, file_contents: mock_args = Path(temp_template.name) with pytest.raises(DecodeValidationError) as re: - read_job_template(mock_args) + read_job_template(mock_args, supported_extensions=[]) assert "validation errors for JobTemplate" in str(re.value) @@ -454,7 +454,7 @@ def test_generate_job_success( "openjd.cli._common.job_from_template", new=Mock(side_effect=job_from_template), ) as patched_job_from_template: - generate_job(mock_args) + generate_job(mock_args, supported_extensions=[]) patched_job_from_template.assert_called_once_with( ANY, expected_param_list, Path(temp_template.name).parent, Path(os.getcwd()) ) @@ -500,6 +500,6 @@ def test_generate_job_raises( ) with pytest.raises(RuntimeError) as excinfo: - generate_job(args) + generate_job(args, supported_extensions=[]) assert expected_error in str(excinfo.value) diff --git a/test/openjd/cli/test_local_session.py b/test/openjd/cli/test_local_session.py index f54af90..081cee0 100644 --- a/test/openjd/cli/test_local_session.py +++ b/test/openjd/cli/test_local_session.py @@ -5,8 +5,13 @@ import signal from . import SampleSteps, SESSION_PARAMETERS +from openjd.model import StepParameterSpaceIterator from openjd.sessions import Session, SessionState -from openjd.cli._run._local_session._session_manager import LocalSession +from openjd.cli._run._local_session._session_manager import ( + LocalSession, + EnvironmentType, + LocalSessionFailed, +) import openjd.cli._run._local_session._session_manager as local_session_mod @@ -43,91 +48,44 @@ def patched_actions(): yield patched_enter, patched_run, patched_exit, patched_callback -@pytest.mark.usefixtures("sample_job_and_dirs") -@pytest.mark.parametrize( - "given_parameters,expected_parameters", - [ - pytest.param( - {"TaskNumber": 5, "TaskMessage": "Hello!"}, - {"TaskNumber": "5", "TaskMessage": "Hello!"}, - id="All parameters provided", - ), - pytest.param( - {"TaskMessage": "Hello!"}, - {"TaskNumber": "1", "TaskMessage": "Hello!"}, - id="Some parameters provided", - ), - pytest.param( - {"FakeParameter": "Hello!", "TaskNumber": 5}, - {"TaskNumber": "5", "TaskMessage": "Hi!"}, - id="Unused parameter name", - ), - pytest.param( - {"FakeInt": 5, "FakeStr": "Hello!"}, - {"TaskNumber": "1", "TaskMessage": "Hi!"}, - id="Only unused parameter names", - ), - ], -) -def test_generate_task_parameter_set( - sample_job_and_dirs: tuple, given_parameters: dict, expected_parameters: dict -): - """ - Test that a LocalSession can generate Task parameters given valid user input. - """ - sample_job, template_dir, current_working_dir = sample_job_and_dirs - with LocalSession(job=sample_job, session_id="my-session") as session: - # Convince the type checker that `parameterSpace` exists - param_space = sample_job.steps[SampleSteps.TaskParamStep].parameterSpace - if param_space: - parameter_set = session._generate_task_parameter_set( - parameter_space=param_space, - parameter_values=given_parameters, - ) - - assert all( - [param.value == expected_parameters[name] for name, param in parameter_set.items()] - ) - - @pytest.mark.usefixtures("sample_job_and_dirs") @pytest.mark.parametrize(*SESSION_PARAMETERS) def test_localsession_initialize( sample_job_and_dirs: tuple, - dependency_indexes: list[int], step_index: int, maximum_tasks: int, parameter_sets: list[dict], - num_expected_environments: int, num_expected_tasks: int, ): """ - Test that initializing the local Session clears the `ended` flag, only generates Task parameters - when necessary, and adds to the Action queue appropriately. + Test that initializing the local Session enters external and job environments, and is ready to run tasks. """ sample_job, template_dir, current_working_dir = sample_job_and_dirs - with LocalSession(job=sample_job, session_id="my-session") as session: - with patch.object( + with ( + patch.object( LocalSession, - "_generate_task_parameter_set", + "run_environment_enters", autospec=True, - side_effect=LocalSession._generate_task_parameter_set, - ) as patched_generate_params: - session.initialize( - dependencies=[sample_job.steps[i] for i in dependency_indexes], - step=sample_job.steps[step_index], - maximum_tasks=maximum_tasks, - task_parameter_values=parameter_sets, - ) + side_effect=LocalSession.run_environment_enters, + ) as patched_run_environment_enters, + patch.object( + LocalSession, "run_step", autospec=True, side_effect=LocalSession.run_step + ) as patched_run_step, + ): + with LocalSession(job=sample_job, session_id="my-session") as session: + assert session._openjd_session.state == SessionState.READY - if parameter_sets and sample_job.steps[step_index].parameterSpace: - patched_generate_params.assert_called() - else: - patched_generate_params.assert_not_called() + # It should have entered the external and job environments in order + assert patched_run_environment_enters.call_count == 2 + assert patched_run_environment_enters.call_args_list[0] == call( + session, None, EnvironmentType.EXTERNAL + ) + assert patched_run_environment_enters.call_args_list[1] == call( + session, sample_job.jobEnvironments, EnvironmentType.JOB + ) - assert not session.ended.is_set() - assert session._enter_env_queue.qsize() == num_expected_environments - assert session._action_queue.qsize() == num_expected_tasks + # It should not have run any steps + assert patched_run_step.call_count == 0 @pytest.mark.usefixtures("sample_job_and_dirs") @@ -158,55 +116,75 @@ def test_localsession_traps_sigint(sample_job_and_dirs: tuple): def test_localsession_run_success( sample_job_and_dirs: tuple, capsys: pytest.CaptureFixture, - dependency_indexes: list[int], step_index: int, maximum_tasks: int, parameter_sets: list[dict], - num_expected_environments: int, num_expected_tasks: int, ): """ - Test that calling `run` causes the local Session to - iterate through the actions defined in the Job. + Test that calling `run_step` causes the local Session to run the tasks requested in that step. """ sample_job, template_dir, current_working_dir = sample_job_and_dirs - with LocalSession(job=sample_job, session_id="my-session") as session: - session.initialize( - dependencies=[sample_job.steps[i] for i in dependency_indexes], - step=sample_job.steps[step_index], - maximum_tasks=maximum_tasks, - task_parameter_values=parameter_sets, + + if parameter_sets is None: + parameter_sets = StepParameterSpaceIterator( + space=sample_job.steps[step_index].parameterSpace ) - session.run() - session.ended.wait() - - assert session.tasks_run == num_expected_tasks # type: ignore - assert session.get_duration() > 0 # type: ignore - assert session._inner_session.enter_environment.call_count == num_expected_environments # type: ignore - assert session._inner_session.run_task.call_count == num_expected_tasks # type: ignore - assert session._inner_session.exit_environment.call_count == num_expected_environments # type: ignore - session._action_callback.assert_called() # type: ignore - assert session._cleanup_called - assert ( - "Open Job Description CLI: All actions completed successfully!" in capsys.readouterr().out - ) + with ( + patch.object( + LocalSession, + "run_environment_enters", + autospec=True, + side_effect=LocalSession.run_environment_enters, + ) as patched_run_environment_enters, + patch.object( + LocalSession, + "run_environment_exits", + autospec=True, + side_effect=LocalSession.run_environment_exits, + ) as patched_run_environment_exits, + patch.object( + LocalSession, "run_step", autospec=True, side_effect=LocalSession.run_step + ) as patched_run_step, + patch.object( + LocalSession, "run_task", autospec=True, side_effect=LocalSession.run_task + ) as patched_run_task, + ): + with LocalSession(job=sample_job, session_id="my-session") as session: + session.run_step( + sample_job.steps[step_index], + task_parameters=parameter_sets, + maximum_tasks=maximum_tasks, + ) + # It should have entered the environments in order + assert patched_run_environment_enters.call_args_list == [ + call(session, None, EnvironmentType.EXTERNAL), + call(session, sample_job.jobEnvironments, EnvironmentType.JOB), + call(session, sample_job.steps[step_index].stepEnvironments, EnvironmentType.STEP), + ] + # It should have run one step + assert patched_run_step.call_args_list == [ + call( + session, + sample_job.steps[step_index], + task_parameters=parameter_sets, + maximum_tasks=maximum_tasks, + ) + ] + # It should have exited the environments in reverse order + assert patched_run_environment_exits.call_args_list == [ + call(session, type=EnvironmentType.STEP, keep_session_running=True), + call(session, type=EnvironmentType.ALL, keep_session_running=False), + ] -@pytest.mark.usefixtures("sample_job_and_dirs") -def test_localsession_run_not_ready(sample_job_and_dirs: tuple): - """ - Test that a LocalSession throws an error when it is not in the "READY" state. - """ - sample_job, template_dir, current_working_dir = sample_job_and_dirs - with LocalSession(job=sample_job, session_id="my-session") as session: - with ( - patch.object(Session, "state", new=SessionState.ENDED), - pytest.raises(RuntimeError) as rte, - ): - session.run() + assert patched_run_task.call_count == num_expected_tasks - assert "not in a READY state" in str(rte.value) + assert ( + "Open Job Description CLI: All actions completed successfully!" + in capsys.readouterr().out + ) @pytest.mark.usefixtures("sample_job_and_dirs", "capsys") @@ -215,14 +193,29 @@ def test_localsession_run_failed(sample_job_and_dirs: tuple, capsys: pytest.Capt Test that a LocalSession can gracefully handle an error in its inner Session. """ sample_job, template_dir, current_working_dir = sample_job_and_dirs - with LocalSession(job=sample_job, session_id="bad-session") as session: - session.initialize(dependencies=[], step=sample_job.steps[SampleSteps.BadCommand]) - session.run() - session.ended.wait() - - # The Task has failed. That means that we've entered the one environment and also exited it. - session._inner_session.enter_environment.assert_called_once() # type: ignore - session._inner_session.exit_environment.assert_called_once() # type: ignore - assert session.failed - assert session._cleanup_called - assert "Open Job Description CLI: ERROR" in capsys.readouterr().out + with ( + patch.object( + LocalSession, + "run_environment_enters", + autospec=True, + side_effect=LocalSession.run_environment_enters, + ) as patched_run_environment_enters, + ): + with LocalSession(job=sample_job, session_id="bad-session") as session: + with pytest.raises(LocalSessionFailed): + session.run_step(sample_job.steps[SampleSteps.BadCommand]) + + # The Task has failed. That means that we've entered the one environment and also exited it. + assert patched_run_environment_enters.call_args_list == [ + call(session, None, EnvironmentType.EXTERNAL), + call(session, sample_job.jobEnvironments, EnvironmentType.JOB), + call( + session, + sample_job.steps[SampleSteps.BadCommand].stepEnvironments, + EnvironmentType.STEP, + ), + ] + session._openjd_session.exit_environment.assert_called_once() # type: ignore + assert session.failed + assert session._cleanup_called + assert "Open Job Description CLI: ERROR" in capsys.readouterr().out diff --git a/test/openjd/cli/test_run_command.py b/test/openjd/cli/test_run_command.py index 65047ce..368ad21 100644 --- a/test/openjd/cli/test_run_command.py +++ b/test/openjd/cli/test_run_command.py @@ -6,533 +6,239 @@ import tempfile import re import os -from typing import Any, Optional +from typing import Optional import logging +import shlex import pytest -from unittest.mock import Mock, patch +from unittest.mock import Mock + +from . import MOCK_TEMPLATE, SampleSteps, run_openjd_cli_main, format_capsys_outerr -from . import MOCK_TEMPLATE, SampleSteps from openjd.cli._run._run_command import ( - OpenJDRunResult, do_run, - _run_local_session, _process_task_params, _process_tasks, - _validate_task_params, ) -from openjd.cli._run._local_session._session_manager import LocalSession -from openjd.sessions import LOG as SessionsLogger, PathMappingRule, PathFormat, Session -from openjd.model import decode_job_template, create_job, ParameterValue, ParameterValueType - - -TEST_RUN_JOB_TEMPLATE_BASIC = { - "specificationVersion": "jobtemplate-2023-09", - "name": "Job", - "parameterDefinitions": [{"name": "J", "type": "STRING"}], - "jobEnvironments": [ - { - "name": "J1", - "script": { - "actions": { - "onEnter": {"command": "python", "args": ["-c", "print('J1 Enter')"]}, - "onExit": {"command": "python", "args": ["-c", "print('J1 Exit')"]}, - } - }, - }, - { - "name": "J2", - "script": { - "actions": { - "onEnter": {"command": "python", "args": ["-c", "print('J2 Enter')"]}, - "onExit": {"command": "python", "args": ["-c", "print('J2 Exit')"]}, - } - }, - }, - ], - "steps": [ - { - "name": "First", - "parameterSpace": { - "taskParameterDefinitions": [ - {"name": "Foo", "type": "INT", "range": "1"}, - {"name": "Bar", "type": "STRING", "range": ["Bar1", "Bar2"]}, - ] - }, - "stepEnvironments": [ - { - "name": "FirstS", - "script": { - "actions": { - "onEnter": { - "command": "python", - "args": ["-c", "print('FirstS Enter')"], - }, - "onExit": {"command": "python", "args": ["-c", "print('FirstS Exit')"]}, - } - }, - }, - ], - "script": { - "actions": { - "onRun": { - "command": "python", - "args": [ - "-c", - "print('J={{Param.J}} Foo={{Task.Param.Foo}}. Bar={{Task.Param.Bar}}')", - ], - } - } - }, - } - ], -} - -TEST_RUN_JOB_TEMPLATE_DEPENDENCY = { - "specificationVersion": "jobtemplate-2023-09", - "name": "Job", - "parameterDefinitions": [{"name": "J", "type": "STRING"}], - "jobEnvironments": [ - { - "name": "J1", - "script": { - "actions": { - "onEnter": {"command": "python", "args": ["-c", "print('J1 Enter')"]}, - "onExit": {"command": "python", "args": ["-c", "print('J1 Exit')"]}, - } - }, - }, - ], - "steps": [ - { - "name": "First", - "parameterSpace": { - "taskParameterDefinitions": [ - {"name": "Foo", "type": "INT", "range": "1"}, - {"name": "Bar", "type": "STRING", "range": ["Bar1", "Bar2"]}, - ] - }, - "script": { - "actions": { - "onRun": { - "command": "python", - "args": [ - "-c", - "print('J={{Param.J}} Foo={{Task.Param.Foo}}. Bar={{Task.Param.Bar}}')", - ], - } - } - }, - }, - { - "name": "Second", - "dependencies": [{"dependsOn": "First"}], - "parameterSpace": { - "taskParameterDefinitions": [ - {"name": "Fuz", "type": "INT", "range": "1-2"}, - ] - }, - "script": { - "actions": { - "onRun": { - "command": "python", - "args": [ - "-c", - "print('J={{Param.J}} Fuz={{Task.Param.Fuz}}.')", - ], - } - } - }, - }, - ], -} - -TEST_RUN_ENV_TEMPLATE_1 = { - "specificationVersion": "environment-2023-09", - "environment": { - "name": "Env1", - "script": { - "actions": { - "onEnter": {"command": "python", "args": ["-c", "print('Env1 Enter')"]}, - "onExit": {"command": "python", "args": ["-c", "print('Env1 Exit')"]}, - } - }, - }, -} - -TEST_RUN_ENV_TEMPLATE_2 = { - "specificationVersion": "environment-2023-09", - "environment": { - "name": "Env2", - "script": { - "actions": { - "onEnter": {"command": "python", "args": ["-c", "print('Env2 Enter')"]}, - "onExit": {"command": "python", "args": ["-c", "print('Env2 Exit')"]}, - } - }, - }, -} - -TEST_RUN_ENV_TEMPLATE_FAILS_ENTER = { - "specificationVersion": "environment-2023-09", - "environment": { - "name": "EnvEnterFail", - "script": { - "actions": { - "onEnter": { - "command": "python", - "args": ["-c", "import sys; print('EnvEnterFail Enter'); sys.exit(1)"], - }, - "onExit": {"command": "python", "args": ["-c", "print('EnvEnterFail Exit')"]}, - } - }, - }, -} - -TEST_RUN_ENV_TEMPLATE_FAILS_EXIT = { - "specificationVersion": "environment-2023-09", - "environment": { - "name": "EnvExitFail", - "script": { - "actions": { - "onEnter": {"command": "python", "args": ["-c", "print('EnvExitFail Enter')"]}, - "onExit": { - "command": "python", - "args": ["-c", "import sys; print('EnvExitFail Exit'); sys.exit(1)"], - }, - } - }, - }, -} - - -@pytest.mark.parametrize( - "job_template,env_templates,step_name,task_params,run_dependencies,expected_output,expected_not_in_output,expect_system_exit", - [ - pytest.param( - TEST_RUN_JOB_TEMPLATE_BASIC, - [], # Env Templates - "First", # step name - [], # Task params - True, # run_dependencies - re.compile( - r"J1 Enter.*J2 Enter.*FirstS Enter.*J=Jvalue.*Foo=1. Bar=Bar1.*Foo=1. Bar=Bar2.*FirstS Exit.*J2 Exit.*J1 Exit" - ), - "", - False, - id="RunFirstStep", +from openjd.cli._run._local_session._session_manager import LoggingTimestampFormat +from openjd.sessions import LOG as SessionsLogger, PathMappingRule, PathFormat + + +PARAMETRIZE_CASES: tuple = ( + pytest.param( + "basic.yaml", + [], # Env Templates + "First", # step name + [], # Task params + True, # run_dependencies + re.compile( + r"J1 Enter.*J2 Enter.*FirstS Enter.*J=Jvalue.*Foo=1. Bar=Bar1.*Foo=1. Bar=Bar2.*FirstS Exit.*J2 Exit.*J1 Exit" ), - pytest.param( - TEST_RUN_JOB_TEMPLATE_BASIC, - [], # Env Templates - "First", # step name - ["Foo=1", "Bar=Bar1"], # Task params - True, # run_dependencies - re.compile( - r"J1 Enter.*J2 Enter.*FirstS Enter.*J=Jvalue.*Foo=1. Bar=Bar1.*FirstS Exit.*J2 Exit.*J1 Exit" - ), - "Foo=1. Bar=Bar2", - False, - id="RunSelectTask", + "", + 0, + id="RunFirstStep", + ), + pytest.param( + "basic.yaml", + [], # Env Templates + "First", # step name + ["-tp", "Foo=1", "-tp", "Bar=Bar1"], # Task params + True, # run_dependencies + re.compile( + r"J1 Enter.*J2 Enter.*FirstS Enter.*J=Jvalue.*Foo=1. Bar=Bar1.*FirstS Exit.*J2 Exit.*J1 Exit" ), - pytest.param( - TEST_RUN_JOB_TEMPLATE_DEPENDENCY, - [], # Env Templates - "Second", # step name - [], # Task params - True, # run_dependencies - re.compile( - r"J1 Enter.*J=Jvalue.*Foo=1. Bar=Bar1.*Foo=1. Bar=Bar2.*J=Jvalue Fuz=1.*J=Jvalue Fuz=2.*J1 Exit" - ), - "", - False, - id="RunSecondStepWithDep", + "Foo=1. Bar=Bar2", + 0, + id="RunSelectTask", + ), + pytest.param( + "basic_dependency_job.yaml", + [], # Env Templates + "Second", # step name + [], # Task params + True, # run_dependencies + re.compile( + r"J1 Enter.*J=Jvalue.*Foo=1. Bar=Bar1.*Foo=1. Bar=Bar2.*J=Jvalue Fuz=1.*J=Jvalue Fuz=2.*J1 Exit" ), - pytest.param( - TEST_RUN_JOB_TEMPLATE_DEPENDENCY, - [], # Env Templates - "Second", # step name - [], # Task params - False, # run_dependencies - re.compile(r"J1 Enter.*J=Jvalue Fuz=1.*J=Jvalue Fuz=2.*J1 Exit"), - "Foo=1. Bar=Bar1", - False, - id="RunSecondStepNoDep", - ), - pytest.param( - TEST_RUN_JOB_TEMPLATE_BASIC, - [TEST_RUN_ENV_TEMPLATE_1], # Env Templates - "First", # step name - [], # Task params - True, # run_dependencies - re.compile( - r"Env1 Enter.*J1 Enter.*J2 Enter.*FirstS Enter.*J=Jvalue.*Foo=1. Bar=Bar1.*Foo=1. Bar=Bar2.*FirstS Exit.*J2 Exit.*J1 Exit.*Env1 Exit" - ), - "", - False, - id="WithOneEnv", - ), - pytest.param( - TEST_RUN_JOB_TEMPLATE_BASIC, - [TEST_RUN_ENV_TEMPLATE_1, TEST_RUN_ENV_TEMPLATE_2], # Env Templates - "First", # step name - [], # Task params - True, # run_dependencies - re.compile( - r"Env1 Enter.*Env2 Enter.*J1 Enter.*J2 Enter.*FirstS Enter.*J=Jvalue.*Foo=1. Bar=Bar1.*Foo=1. Bar=Bar2.*FirstS Exit.*J2 Exit.*J1 Exit.*Env2 Exit.*Env1 Exit" - ), - "", - False, - id="WithTwoEnvs", - ), - pytest.param( - { - "specificationVersion": "jobtemplate-2023-09", - "name": "Test", - "parameterDefinitions": [{"name": "J", "type": "STRING"}], - "steps": [ - { - "name": "SimpleStep", - "script": { - "actions": { - "onRun": { - "command": "python", - "args": ["-c", "print('DoTask')"], - } - } - }, - } - ], - }, - [TEST_RUN_ENV_TEMPLATE_FAILS_ENTER], # Env Templates - "SimpleStep", # step name - [], # Task params - False, # run_dependencies - re.compile(r"EnvEnterFail Enter.*EnvEnterFail Exit"), - # We should not run the task - "DoTask", - True, - id="EnterEnvFails", - ), - pytest.param( - { - "specificationVersion": "jobtemplate-2023-09", - "name": "Test", - "parameterDefinitions": [{"name": "J", "type": "STRING"}], - "steps": [ - { - "name": "SimpleStep", - "script": { - "actions": { - "onRun": { - "command": "python", - "args": ["-c", "print('DoTask')"], - } - } - }, - } - ], - }, - [TEST_RUN_ENV_TEMPLATE_FAILS_ENTER, TEST_RUN_ENV_TEMPLATE_1], # Env Templates - "SimpleStep", # step name - [], # Task params - False, # run_dependencies - re.compile(r"EnvEnterFail Enter.*EnvEnterFail Exit"), - # We should not run the second environment - "Env1 Enter", - True, - id="EnterEnvFails_2", - ), - pytest.param( - { - "specificationVersion": "jobtemplate-2023-09", - "name": "Test", - "parameterDefinitions": [{"name": "J", "type": "STRING"}], - "steps": [ - { - "name": "SimpleStep", - "script": { - "actions": { - "onRun": { - "command": "python", - "args": ["-c", "import sys; print('DoTask'); sys.exit(1)"], - } - } - }, - } - ], - }, - [TEST_RUN_ENV_TEMPLATE_1], # Env Templates - "SimpleStep", # step name - [], # Task params - False, # run_dependencies - # Task fails; we should still run everything - re.compile(r"Env1 Enter.*DoTask.*Env1 Exit"), - "", - True, - id="TaskFails", + "", + 0, + id="RunSecondStepWithDep", + ), + pytest.param( + "basic_dependency_job.yaml", + [], # Env Templates + "Second", # step name + [], # Task params + False, # run_dependencies + re.compile(r"J1 Enter.*J=Jvalue Fuz=1.*J=Jvalue Fuz=2.*J1 Exit"), + "Foo=1. Bar=Bar1", + 0, + id="RunSecondStepNoDep", + ), + pytest.param( + "basic.yaml", + ["env_1.yaml"], # Env Templates + "First", # step name + [], # Task params + True, # run_dependencies + re.compile( + r"Env1 Enter.*J1 Enter.*J2 Enter.*FirstS Enter.*J=Jvalue.*Foo=1. Bar=Bar1.*Foo=1. Bar=Bar2.*FirstS Exit.*J2 Exit.*J1 Exit.*Env1 Exit" ), - pytest.param( - { - "specificationVersion": "jobtemplate-2023-09", - "name": "Test", - "parameterDefinitions": [{"name": "J", "type": "STRING"}], - "steps": [ - { - "name": "SimpleStep", - "script": { - "actions": { - "onRun": { - "command": "python", - "args": ["-c", "print('DoTask')"], - } - } - }, - } - ], - }, - [TEST_RUN_ENV_TEMPLATE_FAILS_EXIT], # Env Templates - "SimpleStep", # step name - [], # Task params - False, # run_dependencies - re.compile( - # environment exit fails; we still run everything - r"EnvExitFail Enter.*DoTask.*EnvExitFail Exit" - ), - "", - True, - id="EnvExitFails", + "", + 0, + id="WithOneEnv", + ), + pytest.param( + "basic.yaml", + ["env_1.yaml", "env_2.yaml"], # Env Templates + "First", # step name + [], # Task params + True, # run_dependencies + re.compile( + r"Env1 Enter.*Env2 Enter.*J1 Enter.*J2 Enter.*FirstS Enter.*J=Jvalue.*Foo=1. Bar=Bar1.*Foo=1. Bar=Bar2.*FirstS Exit.*J2 Exit.*J1 Exit.*Env2 Exit.*Env1 Exit" ), - pytest.param( - { - "specificationVersion": "jobtemplate-2023-09", - "name": "Test", - "parameterDefinitions": [{"name": "J", "type": "STRING"}], - "steps": [ - { - "name": "SimpleStep", - "script": { - "actions": { - "onRun": { - "command": "python", - "args": ["-c", "print('DoTask')"], - } - } - }, - } - ], - }, - [TEST_RUN_ENV_TEMPLATE_1, TEST_RUN_ENV_TEMPLATE_FAILS_EXIT], # Env Templates - "SimpleStep", # step name - [], # Task params - False, # run_dependencies - re.compile( - # environment exit fails; we still run everything - r"Env1 Enter.*EnvExitFail Enter.*DoTask.*EnvExitFail Exit.*Env1 Exit" - ), - "", - True, - id="EnvExitFails_2", + "", + 0, + id="WithTwoEnvs", + ), + pytest.param( + "simple_with_j_param.yaml", + ["env_fails_enter.yaml"], # Env Templates + "SimpleStep", # step name + [], # Task params + False, # run_dependencies + re.compile(r"EnvEnterFail Enter.*EnvEnterFail Exit"), + # We should not run the task + "DoTask", + 1, + id="EnterEnvFails", + ), + pytest.param( + "simple_with_j_param.yaml", + ["env_fails_enter.yaml", "env_1.yaml"], # Env Templates + "SimpleStep", # step name + [], # Task params + False, # run_dependencies + re.compile(r"EnvEnterFail Enter.*EnvEnterFail Exit"), + # We should not run the second environment + "Env1 Enter", + 1, + id="EnterEnvFails_2", + ), + pytest.param( + "simple_with_j_param_exit_1.yaml", + ["env_1.yaml"], # Env Templates + "SimpleStep", # step name + [], # Task params + False, # run_dependencies + # Task fails; we should still run everything + re.compile(r"Env1 Enter.*DoTask.*Env1 Exit"), + "", + 1, + id="TaskFails", + ), + pytest.param( + "simple_with_j_param.yaml", + ["env_fails_exit.yaml"], # Env Templates + "SimpleStep", # step name + [], # Task params + False, # run_dependencies + re.compile( + # environment exit fails; we still run everything + r"EnvExitFail Enter.*DoTask.*EnvExitFail Exit" ), - pytest.param( - { - "specificationVersion": "jobtemplate-2023-09", - "name": "TimeoutTest", - "parameterDefinitions": [{"name": "J", "type": "STRING"}], - "steps": [ - { - "name": "Timeout", - "script": { - "actions": { - "onRun": { - "command": "python", - "args": [ - "-c", - # Obfuscate "EXIT_NORMAL" so it doesn't appear in the log when Windows prints the command that's run to the log. - "import time,sys; print('SLEEP'); sys.stdout.flush(); time.sleep(5); print(chr(69)+'XIT_NORMAL')", - ], - "timeout": 2, - } - } - }, - } - ], - }, - [], # Env Templates - "Timeout", # step name - [], # Task params - False, # run_dependencies - re.compile(r"SLEEP"), - "EXIT_NORMAL", - True, - id="TaskTimeout", + "", + 1, + id="EnvExitFails", + ), + pytest.param( + "simple_with_j_param.yaml", + ["env_1.yaml", "env_fails_exit.yaml"], # Env Templates + "SimpleStep", # step name + [], # Task params + False, # run_dependencies + re.compile( + # environment exit fails; we still run everything + r"Env1 Enter.*EnvExitFail Enter.*DoTask.*EnvExitFail Exit.*Env1 Exit" ), - ], + "", + 1, + id="EnvExitFails_2", + ), + pytest.param( + "job_sleep_exit_normal.yaml", + [], # Env Templates + "Timeout", # step name + [], # Task params + False, # run_dependencies + re.compile(r"SLEEP"), + "EXIT_NORMAL", + 1, + id="TaskTimeout", + ), +) + + +@pytest.mark.parametrize( + "job_template_file,env_template_files,step_name,task_params,run_dependencies,expected_output_regex,expected_not_in_output,expected_exit_code", + PARAMETRIZE_CASES, ) def test_do_run_success( - job_template: dict[str, Any], - env_templates: list[dict[str, Any]], + job_template_file: str, + env_template_files: list[str], step_name: str, task_params: list[str], run_dependencies: bool, - expected_output: re.Pattern[str], + expected_output_regex: re.Pattern[str], expected_not_in_output: str, - expect_system_exit: bool, - caplog: pytest.LogCaptureFixture, + expected_exit_code: int, + capsys: pytest.CaptureFixture, ) -> None: """Test that the 'run' command correctly runs templates and obtains the expected results.""" - files_created: list[Path] = [] - try: - # GIVEN - with tempfile.NamedTemporaryFile( - mode="w+t", suffix=".template.json", encoding="utf8", delete=False - ) as job_template_file: - json.dump(job_template, job_template_file.file) - files_created.append(Path(job_template_file.name)) - - environments_files: list[str] = [] - for e in env_templates: - with tempfile.NamedTemporaryFile( - mode="w+t", suffix=".env.template.json", encoding="utf8", delete=False - ) as file: - json.dump(e, file.file) - files_created.append(Path(file.name)) - environments_files.append(file.name) + template_dir = Path(__file__).parent / "templates" - args = Namespace( - path=Path(job_template_file.name), - step=step_name, - job_params=["J=Jvalue"], - task_params=task_params, - tasks=None, - maximum_tasks=-1, - run_dependencies=run_dependencies, - path_mapping_rules=None, - environments=environments_files, - output="human-readable", - verbose=False, - preserve=False, + extra_options = [] + if run_dependencies: + extra_options.append("--run-dependencies") + if env_template_files: + extra_options.extend( + [ + entry + for file in env_template_files + for entry in ["--environment", str(template_dir / file)] + ] ) - # WHEN - try: - do_run(args) - except SystemExit: - assert expect_system_exit - else: - assert not expect_system_exit + args = [ + "run", + str(template_dir / job_template_file), + "--step", + step_name, + "-p", + "J=Jvalue", + *task_params, + *extra_options, + "--extensions", + "", + ] - # THEN - assert not any( - os.linesep in m for m in caplog.messages - ), "paranoia; Windows is acting weird" - assert expected_output.search("".join(m.strip() for m in caplog.messages)) - if expected_not_in_output: - assert expected_not_in_output not in caplog.text - finally: - for f in files_created: - f.unlink() + print(f"openjd {shlex.join(args)}") + + outerr = run_openjd_cli_main(capsys, args=args, expected_exit_code=expected_exit_code) + + expected_was_found = expected_output_regex.search(outerr.out.replace("\n", "\\n"), re.MULTILINE) + if expected_was_found is None: + # Print out the environment and job templates for easier error debugging from the log outputs + print("\n ENV TEMPLATES:\n") + print(json.dumps(env_template_files, indent=1)) + print("\n JOB TEMPLATE:\n") + print(json.dumps(job_template_file, indent=1)) + assert ( + expected_was_found + ), f"Regex r'{expected_output_regex.pattern}' does not match the output:\n{format_capsys_outerr(outerr)}" + if expected_not_in_output: + assert expected_not_in_output not in outerr.out def test_preserve_option( @@ -571,6 +277,7 @@ def test_preserve_option( args = Namespace( path=Path(job_template_file.name), step="TestStep", + timestamp_format=LoggingTimestampFormat.RELATIVE, job_params=[], task_params=None, tasks=None, @@ -581,6 +288,7 @@ def test_preserve_option( output="human-readable", verbose=False, preserve=True, + extensions="", ) # WHEN @@ -634,6 +342,7 @@ def test_verbose_option( args = Namespace( path=Path(job_template_file.name), step="TestStep", + timestamp_format=LoggingTimestampFormat.RELATIVE, job_params=[], task_params=None, tasks=None, @@ -644,6 +353,7 @@ def test_verbose_option( output="human-readable", verbose=True, preserve=False, + extensions="", ) # WHEN @@ -666,6 +376,7 @@ def test_do_run_error(): mock_args = Namespace( path=Path("some-file.json"), step="aStep", + timestamp_format=LoggingTimestampFormat.RELATIVE, job_params=None, task_params=None, run_dependencies=False, @@ -674,6 +385,7 @@ def test_do_run_error(): output="human-readable", verbose=False, preserve=False, + extensions="", ) with pytest.raises(SystemExit): do_run(mock_args) @@ -728,6 +440,7 @@ def test_do_run_path_mapping_rules(caplog: pytest.LogCaptureFixture): run_args = Namespace( path=Path(temp_template.name), step="TestStep", + timestamp_format=LoggingTimestampFormat.RELATIVE, job_params=[r"TestPath=/home/test" if os.name == "posix" else r"TestPath=c:\test"], task_params=None, tasks=None, @@ -738,6 +451,7 @@ def test_do_run_path_mapping_rules(caplog: pytest.LogCaptureFixture): maximum_tasks=1, verbose=False, preserve=False, + extensions="", ) # WHEN @@ -772,6 +486,7 @@ def test_do_run_nonexistent_step(capsys: pytest.CaptureFixture): mock_args = Namespace( path=Path(temp_template.name), step="FakeStep", + timestamp_format=LoggingTimestampFormat.RELATIVE, job_params=None, task_params=None, tasks=None, @@ -782,6 +497,7 @@ def test_do_run_nonexistent_step(capsys: pytest.CaptureFixture): output="human-readable", verbose=False, preserve=False, + extensions="", ) with pytest.raises(SystemExit): do_run(mock_args) @@ -793,54 +509,71 @@ def test_do_run_nonexistent_step(capsys: pytest.CaptureFixture): Path(temp_template.name).unlink() -@pytest.mark.usefixtures( - "sample_job_and_dirs", "sample_step_map", "patched_session_cleanup", "capsys" -) -@pytest.mark.parametrize( - "step_index,dependency_indexes,should_run_dependencies", - [ - pytest.param(SampleSteps.BareStep, [], False, id="Bare Step"), - pytest.param( +PARAMETRIZE_CASES = ( + pytest.param(SampleSteps.BareStep, [], [], id="Bare Step without --run-dependencies"), + pytest.param( + SampleSteps.BareStep, [], ["--run-dependencies"], id="Bare Step with --run-dependencies" + ), + pytest.param( + SampleSteps.BareStep, + [], + ["--run-dependencies"], + id="--run-dependencies with no dependencies", + ), + pytest.param(SampleSteps.TaskParamStep, [], ["--run-dependencies"], id="Task param Step"), + pytest.param(SampleSteps.NormalStep, [], ["--run-dependencies"], id="Catch-all Step"), + pytest.param( + SampleSteps.NormalStep, + [], + ["--run-dependencies"], + id="--run-dependencies with Step environment but no dependencies", + ), + pytest.param( + SampleSteps.DependentStep, + [SampleSteps.BareStep], + ["--run-dependencies"], + id="Step with direct dependency", + ), + pytest.param( + SampleSteps.ExtraDependentStep, + [ SampleSteps.BareStep, - [], - True, - id="--run-dependencies with no dependencies", - ), - pytest.param(SampleSteps.TaskParamStep, [], False, id="Task param Step"), - pytest.param(SampleSteps.NormalStep, [], False, id="Catch-all Step"), - pytest.param( - SampleSteps.NormalStep, - [], - True, - id="--run-dependencies with Step environment but no dependencies", - ), - pytest.param( SampleSteps.DependentStep, - [SampleSteps.BareStep], - True, - id="Step with direct dependency", - ), - pytest.param( - SampleSteps.ExtraDependentStep, - [ - SampleSteps.BareStep, - SampleSteps.DependentStep, - SampleSteps.TaskParamStep, - ], - True, - id="Step with transitive and direct dependencies", - ), - pytest.param(SampleSteps.DependentStep, [], False, id="Exclude dependencies"), - ], + SampleSteps.TaskParamStep, + ], + ["--run-dependencies"], + id="Step with transitive and direct dependencies", + ), + pytest.param(SampleSteps.DependentStep, [], [], id="Exclude dependencies implicitly"), + pytest.param( + SampleSteps.DependentStep, + [], + ["--no-run-dependencies"], + id="Exclude dependencies with explicit option", + ), + pytest.param( + SampleSteps.StepDepHasStepEnv, + [SampleSteps.NormalStep], + ["--run-dependencies"], + id="Step with a dependency that has step envs", + ), +) + + +@pytest.mark.parametrize( + "step_index,dependency_indexes,extra_options", + PARAMETRIZE_CASES, +) +@pytest.mark.usefixtures( + "sample_job_and_dirs", "sample_step_map", "patched_session_cleanup", "capsys" ) def test_run_local_session_success( - sample_job_and_dirs: tuple, sample_step_map: dict, patched_session_cleanup: Mock, capsys: pytest.CaptureFixture, - step_index: int, - dependency_indexes: list[int], - should_run_dependencies: bool, + step_index: SampleSteps, + dependency_indexes: list[SampleSteps], + extra_options: list[str], ): """ Test that various Job structures can successfully run local Sessions. @@ -848,79 +581,82 @@ def test_run_local_session_success( Note that we don't need to test with custom Task parameters, as those are tested within the `LocalSession` object. """ - sample_job, template_dir, current_working_dir = sample_job_and_dirs + + template_dir = Path(__file__).parent / "templates" path_mapping_rules = [ PathMappingRule( source_path_format=PathFormat.WINDOWS, source_path=PureWindowsPath(r"C:\test"), destination_path=PurePosixPath("/mnt/test"), - ) + ).to_dict() ] - with ( - patch.object( - LocalSession, "initialize", autospec=True, side_effect=LocalSession.initialize - ) as patched_initialize, - patch.object( - Session, "__init__", autospec=True, side_effect=Session.__init__ - ) as patched_session_init, - ): - response = _run_local_session( - job=sample_job, - step_map=sample_step_map, - step=sample_job.steps[step_index], - path_mapping_rules=path_mapping_rules, - should_run_dependencies=should_run_dependencies, - ) - assert patched_initialize.call_args.kwargs["dependencies"] == [ - sample_job.steps[i] for i in dependency_indexes - ] - assert patched_initialize.call_args.kwargs["step"] == sample_job.steps[step_index] - assert patched_session_init.call_args.kwargs["path_mapping_rules"] == path_mapping_rules + args = [ + "run", + str(template_dir / "job_with_test_steps.yaml"), + "--step", + step_index.name, + "--extensions", + "", + *extra_options, + "--path-mapping-rules", + json.dumps({"version": "pathmapping-1.0", "path_mapping_rules": path_mapping_rules}), + ] + print(f"openjd {shlex.join(args)}") + + outerr = run_openjd_cli_main(capsys, args=args, expected_exit_code=0) - assert response.status == "success" - assert isinstance(response, OpenJDRunResult) - assert response.job_name == sample_job.name - assert response.step_name == sample_job.steps[step_index].name - assert "Open Job Description CLI: All actions completed successfully" in capsys.readouterr().out - patched_session_cleanup.assert_called() + for expected_output_regex in [ + "Running job 'my-job'", + *(f"Running step '{dep_index.name}'" for dep_index in dependency_indexes), + f"Running step '{step_index.name}'", + "All actions completed successfully!", + ]: + assert re.search( + expected_output_regex, outerr.out, re.MULTILINE + ), f"Regex r'{expected_output_regex}' does not match the output:\n{format_capsys_outerr(outerr)}" -@pytest.mark.usefixtures("sample_job_and_dirs", "sample_step_map") @pytest.mark.parametrize( - "step_index,should_run_dependencies,expected_error", + "step_index,expected_error_regex", [ pytest.param( - SampleSteps.BadCommand, False, "Session ended with errors", id="Badly-formed command" - ), - pytest.param( - SampleSteps.ShouldSeparateSession, - True, - "cannot be run in the same local Session", - id="Can't run in single Session", + SampleSteps.BadCommand, "Session ended with errors", id="Badly-formed command" ), ], ) def test_run_local_session_failed( - sample_job_and_dirs: tuple, - sample_step_map: dict, - step_index: int, - should_run_dependencies: bool, - expected_error: str, + capsys, + step_index: SampleSteps, + expected_error_regex: str, ): """ Test the output of a Session that finishes after encountering errors. """ - sample_job, template_dir, current_working_dir = sample_job_and_dirs - response = _run_local_session( - job=sample_job, - step_map=sample_step_map, - step=sample_job.steps[step_index], - path_mapping_rules=[], - should_run_dependencies=should_run_dependencies, - ) + template_dir = Path(__file__).parent / "templates" + path_mapping_rules = [ + PathMappingRule( + source_path_format=PathFormat.WINDOWS, + source_path=PureWindowsPath(r"C:\test"), + destination_path=PurePosixPath("/mnt/test"), + ).to_dict() + ] + args = [ + "run", + str(template_dir / "job_with_test_steps.yaml"), + "--step", + step_index.name, + "--extensions", + "", + "--path-mapping-rules", + json.dumps({"version": "pathmapping-1.0", "path_mapping_rules": path_mapping_rules}), + ] + print(f"openjd {shlex.join(args)}") + + outerr = run_openjd_cli_main(capsys, args=args, expected_exit_code=1) - assert response.status == "error" - assert expected_error in response.message + assert re.search( + expected_error_regex, outerr.out, re.MULTILINE + ), f"Regex r'{expected_error_regex}' does not match the output:\n{format_capsys_outerr(outerr)}" class TestProcessTaskParams: @@ -1075,66 +811,86 @@ def test_error(self, given: str, file_contents: Optional[str], expected_error: s _process_tasks(given) -class TestValidateTaskParams: +@pytest.mark.parametrize( + "task_params", + [ + pytest.param( + ["--tasks", '[{"Foo": "1", "Bar": "Bar1"}]'], id="one task, all params defined" + ), + pytest.param( + ["--tasks", '[{"Foo": "1", "Bar": "Bar1"}, {"Foo": "1", "Bar": "Bar1"}]'], + id="two tasks", + ), + ], +) +def test_task_param_validation_success(capsys, task_params: list[str]) -> None: + template_dir = Path(__file__).parent / "templates" + + args = [ + "run", + str(template_dir / "basic.yaml"), + "-p", + "J=Jvalue", + *task_params, + "--extensions", + "", + ] - @pytest.mark.parametrize( - "given", - [ - pytest.param([{"Foo": "1", "Bar": "Bar1"}], id="one task, all params defined"), - pytest.param( - [{"Foo": "1", "Bar": "Bar1"}, {"Foo": "1", "Bar": "Bar1"}], id="two tasks" - ), - ], - ) - def test_success(self, given: list[dict[str, str]]) -> None: - # GIVEN - job_template = decode_job_template(template=TEST_RUN_JOB_TEMPLATE_BASIC) - job = create_job( - job_template=job_template, - job_parameter_values={ - "J": ParameterValue(type=ParameterValueType.STRING, value="Jvalue") - }, - ) - step = job.steps[0] + print(f"openjd {shlex.join(args)}") - # THEN - # Does not raise - _validate_task_params(step, given) + # Ensure it runs with success exit code + run_openjd_cli_main(capsys, args=args, expected_exit_code=0) - @pytest.mark.parametrize( - "given, expected_error", - [ - pytest.param( - [{"Bar": "Bar1"}], "Task 0 is missing values for parameters: Foo", id="missing Foo" - ), - pytest.param( - [{"Bar": "Bar1"}, {"Foo": "1"}], - "Task 0 is missing values for parameters: Foo.*\n.*Task 1 is missing values for parameters: Bar", - id="missing Foo & Bar; separate tasks", - ), - pytest.param( - [{"Foo": "1", "Bar": "Bar1", "Baz": "wut"}], + +@pytest.mark.parametrize( + "task_params, expected_error_list", + [ + pytest.param( + ["-tp", "Bar=Bar1"], ["Task 0 is missing values for parameters: Foo"], id="missing Foo" + ), + pytest.param( + ["--tasks", '[{"Bar":"Bar1"}, {"Foo":"1"}]'], + [ + "Task 0 is missing values for parameters: Foo", + "Task 1 is missing values for parameters: Bar", + ], + id="missing Foo & Bar; separate tasks", + ), + pytest.param( + ["-tp", "Foo=1", "-tp", "Bar=Bar1", "-tp", "Baz=wut"], + ["Task 0 defines unknown parameters: Baz"], + id="extra parameter", + ), + pytest.param( + ["-tp", "Bar=Bar1", "-tp", "Baz=wut"], + [ "Task 0 defines unknown parameters: Baz", - id="extra parameter", - ), - pytest.param( - [{"Bar": "Bar1", "Baz": "wut"}], - "Task 0 defines unknown parameters: Baz.*\n.*Task 0 is missing values for parameters: Foo", - id="missing & extra parameter", - ), - ], - ) - def test_errors(self, given: list[dict[str, str]], expected_error: str) -> None: - # GIVEN - job_template = decode_job_template(template=TEST_RUN_JOB_TEMPLATE_BASIC) - job = create_job( - job_template=job_template, - job_parameter_values={ - "J": ParameterValue(type=ParameterValueType.STRING, value="Jvalue") - }, - ) - step = job.steps[0] + "Task 0 is missing values for parameters: Foo", + ], + id="missing & extra parameter", + ), + ], +) +def test_task_param_validation_errors( + capsys, task_params: list[str], expected_error_list: list[str] +) -> None: + template_dir = Path(__file__).parent / "templates" + + args = [ + "run", + str(template_dir / "basic.yaml"), + "-p", + "J=Jvalue", + *task_params, + "--extensions", + "", + ] - # THEN - with pytest.raises(RuntimeError, match=expected_error): - _validate_task_params(step, given) + print(f"openjd {shlex.join(args)}") + + outerr = run_openjd_cli_main(capsys, args=args, expected_exit_code=1) + + for expected_error in expected_error_list: + assert ( + expected_error in outerr.out + ), f"Message r'{expected_error}' was not found in the output:\n{format_capsys_outerr(outerr)}" diff --git a/test/openjd/cli/test_summary_command.py b/test/openjd/cli/test_summary_command.py index e1d316d..af7a405 100644 --- a/test/openjd/cli/test_summary_command.py +++ b/test/openjd/cli/test_summary_command.py @@ -62,6 +62,7 @@ def test_do_summary_success( job_params=mock_params, step=mock_step, output="human-readable", + extensions="", ) do_summary(mock_args) @@ -72,7 +73,7 @@ def test_do_summary_error(): """ Test that the `summary` command exits on any error (in this case, we mock an error in `read_template`) """ - mock_args = Namespace(path=Path("some-file.json"), output="human-readable") + mock_args = Namespace(path=Path("some-file.json"), output="human-readable", extensions="") with ( patch("openjd.cli._common.read_template", new=Mock(side_effect=RuntimeError())), pytest.raises(SystemExit), @@ -80,149 +81,148 @@ def test_do_summary_error(): do_summary(mock_args) -@pytest.mark.parametrize( - "mock_job_params,expected_job_name,step_name,expected_tasks,expected_dependencies,expected_total_envs,template_dict", - [ - pytest.param( - JobParameterValues({}), - "my-job", - "BareStep", - 1, - [], - 0, - MOCK_TEMPLATE, - id="No Job parameters, dependencies, or environments", - ), - pytest.param( - JobParameterValues({}), - "my-job", - "DependentStep", - 1, - ["NormalStep"], - 0, - MOCK_TEMPLATE, - id="With dependencies", - ), - pytest.param( - JobParameterValues({}), - "my-job", - "NormalStep", - 1, - [], - 1, - MOCK_TEMPLATE, - id="With environments", +PARAMETRIZE_CASES: tuple = ( + pytest.param( + JobParameterValues({}), + "my-job", + "BareStep", + 1, + [], + 0, + MOCK_TEMPLATE, + id="No Job parameters, dependencies, or environments", + ), + pytest.param( + JobParameterValues({}), + "my-job", + "DependentStep", + 1, + ["NormalStep"], + 0, + MOCK_TEMPLATE, + id="With dependencies", + ), + pytest.param( + JobParameterValues({}), + "my-job", + "NormalStep", + 1, + [], + 1, + MOCK_TEMPLATE, + id="With environments", + ), + pytest.param( + JobParameterValues( + { + "Title": ParameterValue(type=ParameterValueType.STRING, value="new title"), + "RequiredParam": ParameterValue(type=ParameterValueType.INT, value="5"), + } ), - pytest.param( - JobParameterValues( + "new title", + "step1", + 1, + [], + 0, + MOCK_TEMPLATE_REQUIRES_PARAMS, + id="Job parameters supplied", + ), + pytest.param( + {}, + "template", + "step1", + 5, + [], + 0, + { + "specificationVersion": "jobtemplate-2023-09", + "name": "template", + "steps": [ { - "Title": ParameterValue(type=ParameterValueType.STRING, value="new title"), - "RequiredParam": ParameterValue(type=ParameterValueType.INT, value="5"), + "name": "step1", + "parameterSpace": { + "taskParameterDefinitions": [ + {"name": "taskNumber", "type": "INT", "range": [1, 2, 3, 4, 5]} + ] + }, + "script": { + "actions": { + "onRun": {"command": 'echo "Task ran {{Task.Param.taskNumber}} times"'} + } + }, } - ), - "new title", - "step1", - 1, - [], - 0, - MOCK_TEMPLATE_REQUIRES_PARAMS, - id="Job parameters supplied", - ), - pytest.param( - {}, - "template", - "step1", - 5, - [], - 0, - { - "specificationVersion": "jobtemplate-2023-09", - "name": "template", - "steps": [ - { - "name": "step1", - "parameterSpace": { - "taskParameterDefinitions": [ - {"name": "taskNumber", "type": "INT", "range": [1, 2, 3, 4, 5]} - ] - }, - "script": { - "actions": { - "onRun": { - "command": 'echo "Task ran {{Task.Param.taskNumber}} times"' - } - } - }, - } - ], - }, - id="With Task parameters", - ), - pytest.param( - JobParameterValues({"Runs": ParameterValue(type=ParameterValueType.INT, value="7")}), - "template", - "step1", - 8, - [], - 0, - { - "specificationVersion": "jobtemplate-2023-09", - "name": "template", - "parameterDefinitions": [{"name": "Runs", "type": "INT", "default": 1}], - "steps": [ - { - "name": "step1", - "parameterSpace": { - "taskParameterDefinitions": [ - {"name": "taskNumber", "type": "INT", "range": "0-{{Param.Runs}}"} - ] - }, - "script": { - "actions": { - "onRun": { - "command": 'echo "Task ran {{Task.Param.taskNumber}} times"' - } - } - }, - } - ], - }, - id="Task parameters set by Job parameter", - ), - pytest.param( - {}, - "template", - "step1", - 10, - [], - 0, - { - "specificationVersion": "jobtemplate-2023-09", - "name": "template", - "steps": [ - { - "name": "step1", - "parameterSpace": { - "taskParameterDefinitions": [ - {"name": "param1", "type": "INT", "range": [1, 2, 3, 4, 5]}, - {"name": "param2", "type": "INT", "range": [6, 7, 8, 9, 10]}, - {"name": "param3", "type": "STRING", "range": ["yes", "no"]}, - ], - "combination": "(param1, param2) * param3", - }, - "script": { - "actions": { - "onRun": { - "command": 'echo "{{Task.Param.param1}} {{Task.Param.param2}} {{Task.Param.param3}}"' - } + ], + }, + id="With Task parameters", + ), + pytest.param( + JobParameterValues({"Runs": ParameterValue(type=ParameterValueType.INT, value="7")}), + "template", + "step1", + 8, + [], + 0, + { + "specificationVersion": "jobtemplate-2023-09", + "name": "template", + "parameterDefinitions": [{"name": "Runs", "type": "INT", "default": 1}], + "steps": [ + { + "name": "step1", + "parameterSpace": { + "taskParameterDefinitions": [ + {"name": "taskNumber", "type": "INT", "range": "0-{{Param.Runs}}"} + ] + }, + "script": { + "actions": { + "onRun": {"command": 'echo "Task ran {{Task.Param.taskNumber}} times"'} + } + }, + } + ], + }, + id="Task parameters set by Job parameter", + ), + pytest.param( + {}, + "template", + "step1", + 10, + [], + 0, + { + "specificationVersion": "jobtemplate-2023-09", + "name": "template", + "steps": [ + { + "name": "step1", + "parameterSpace": { + "taskParameterDefinitions": [ + {"name": "param1", "type": "INT", "range": [1, 2, 3, 4, 5]}, + {"name": "param2", "type": "INT", "range": [6, 7, 8, 9, 10]}, + {"name": "param3", "type": "STRING", "range": ["yes", "no"]}, + ], + "combination": "(param1, param2) * param3", + }, + "script": { + "actions": { + "onRun": { + "command": 'echo "{{Task.Param.param1}} {{Task.Param.param2}} {{Task.Param.param3}}"' } - }, + } }, - ], - }, - id="Task parameters with combination expression", - ), - ], + }, + ], + }, + id="Task parameters with combination expression", + ), +) + + +@pytest.mark.parametrize( + "mock_job_params,expected_job_name,step_name,expected_tasks,expected_dependencies,expected_total_envs,template_dict", + PARAMETRIZE_CASES, ) def test_get_output_step_summary_success( mock_job_params: JobParameterValues, @@ -236,7 +236,7 @@ def test_get_output_step_summary_success( """ Test that `output_summary_result` returns an object with the expected values when called with a Step. """ - template = decode_job_template(template=template_dict) + template = decode_job_template(template=template_dict, supported_extensions=[]) job = create_job(job_template=template, job_parameter_values=mock_job_params) response = output_summary_result(job, step_name) @@ -256,7 +256,7 @@ def test_output_step_summary_result_error(): Test that `output_summary_result` throws an error if a non-existent Step name is provided. (function only has one error state) """ - template = decode_job_template(template=MOCK_TEMPLATE) + template = decode_job_template(template=MOCK_TEMPLATE, supported_extensions=[]) job = create_job(job_template=template, job_parameter_values={}) response = output_summary_result(job, "no step") @@ -264,180 +264,173 @@ def test_output_step_summary_result_error(): assert "Step 'no step' does not exist in Job 'my-job'" in response.message -@pytest.mark.parametrize( - "mock_params,expected_name,expected_params,expected_steps,expected_total_tasks,expected_total_envs,expected_root_envs,template_dict", - [ - pytest.param( - {}, - "template", - [], - ["step1"], - 1, - 0, - [], - { - "specificationVersion": "jobtemplate-2023-09", - "name": "template", - "steps": [ - { - "name": "step1", - "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, - } - ], - }, - id="No parameters or environments", - ), - pytest.param( - {}, - "DefaultValue", - [("NameParam", "DefaultValue")], - ["step1"], - 1, - 0, - [], - { - "specificationVersion": "jobtemplate-2023-09", - "name": "{{Param.NameParam}}", - "parameterDefinitions": [ - {"name": "NameParam", "type": "STRING", "default": "DefaultValue"} - ], - "steps": [ - { - "name": "step1", - "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, - } - ], - }, - id="Default parameters", - ), - pytest.param( - JobParameterValues( - {"NameParam": ParameterValue(type=ParameterValueType.STRING, value="NewName")} - ), - "NewName", - [("NameParam", "NewName")], - ["step1"], - 1, - 0, - [], - { - "specificationVersion": "jobtemplate-2023-09", - "name": "{{Param.NameParam}}", - "parameterDefinitions": [ - {"name": "NameParam", "type": "STRING", "default": "DefaultValue"} - ], - "steps": [ - { - "name": "step1", - "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, - } - ], - }, - id="Overwritten parameters", - ), - pytest.param( - {}, - "template", - [], - ["step1"], - 1, - 1, - ["aRootEnv"], - { - "specificationVersion": "jobtemplate-2023-09", - "name": "template", - "jobEnvironments": [{"name": "aRootEnv", "variables": {"variable": "value"}}], - "steps": [ - { - "name": "step1", - "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, - } - ], - }, - id="Root environments only", - ), - pytest.param( - {}, - "template", - [], - ["step1"], - 1, - 1, - [], - { - "specificationVersion": "jobtemplate-2023-09", - "name": "template", - "steps": [ - { - "name": "step1", - "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, - "stepEnvironments": [ - {"name": "aStepEnv", "variables": {"variable": "value"}} - ], - } - ], - }, - id="Step environments only", - ), - pytest.param( - {}, - "template", - [], - ["step1"], - 1, - 2, - ["aRootEnv"], - { - "specificationVersion": "jobtemplate-2023-09", - "name": "template", - "jobEnvironments": [{"name": "aRootEnv", "variables": {"variable": "value"}}], - "steps": [ - { - "name": "step1", - "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, - "stepEnvironments": [ - {"name": "aStepEnv", "variables": {"variable": "value"}} - ], - } - ], - }, - id="Root and Step level environments", +PARAMETRIZE_CASES = ( + pytest.param( + {}, + "template", + [], + ["step1"], + 1, + 0, + [], + { + "specificationVersion": "jobtemplate-2023-09", + "name": "template", + "steps": [ + { + "name": "step1", + "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, + } + ], + }, + id="No parameters or environments", + ), + pytest.param( + {}, + "DefaultValue", + [("NameParam", "DefaultValue")], + ["step1"], + 1, + 0, + [], + { + "specificationVersion": "jobtemplate-2023-09", + "name": "{{Param.NameParam}}", + "parameterDefinitions": [ + {"name": "NameParam", "type": "STRING", "default": "DefaultValue"} + ], + "steps": [ + { + "name": "step1", + "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, + } + ], + }, + id="Default parameters", + ), + pytest.param( + JobParameterValues( + {"NameParam": ParameterValue(type=ParameterValueType.STRING, value="NewName")} ), - pytest.param( - {}, - "template", - [], - ["step1", "step2"], - 2, - 2, - [], - { - "specificationVersion": "jobtemplate-2023-09", - "name": "template", - "steps": [ - { - "name": "step1", - "script": { - "actions": {"onRun": {"command": 'echo "We can have lots of fun"'}} - }, - "stepEnvironments": [ - {"name": "step1Env", "variables": {"variable": "value"}} - ], - }, - { - "name": "step2", - "script": { - "actions": {"onRun": {"command": 'echo "There\'s so much we can do"'}} - }, - "stepEnvironments": [ - {"name": "step2Env", "variables": {"variable": "value"}} - ], + "NewName", + [("NameParam", "NewName")], + ["step1"], + 1, + 0, + [], + { + "specificationVersion": "jobtemplate-2023-09", + "name": "{{Param.NameParam}}", + "parameterDefinitions": [ + {"name": "NameParam", "type": "STRING", "default": "DefaultValue"} + ], + "steps": [ + { + "name": "step1", + "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, + } + ], + }, + id="Overwritten parameters", + ), + pytest.param( + {}, + "template", + [], + ["step1"], + 1, + 1, + ["aRootEnv"], + { + "specificationVersion": "jobtemplate-2023-09", + "name": "template", + "jobEnvironments": [{"name": "aRootEnv", "variables": {"variable": "value"}}], + "steps": [ + { + "name": "step1", + "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, + } + ], + }, + id="Root environments only", + ), + pytest.param( + {}, + "template", + [], + ["step1"], + 1, + 1, + [], + { + "specificationVersion": "jobtemplate-2023-09", + "name": "template", + "steps": [ + { + "name": "step1", + "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, + "stepEnvironments": [{"name": "aStepEnv", "variables": {"variable": "value"}}], + } + ], + }, + id="Step environments only", + ), + pytest.param( + {}, + "template", + [], + ["step1"], + 1, + 2, + ["aRootEnv"], + { + "specificationVersion": "jobtemplate-2023-09", + "name": "template", + "jobEnvironments": [{"name": "aRootEnv", "variables": {"variable": "value"}}], + "steps": [ + { + "name": "step1", + "script": {"actions": {"onRun": {"command": 'echo "Hello, world!"'}}}, + "stepEnvironments": [{"name": "aStepEnv", "variables": {"variable": "value"}}], + } + ], + }, + id="Root and Step level environments", + ), + pytest.param( + {}, + "template", + [], + ["step1", "step2"], + 2, + 2, + [], + { + "specificationVersion": "jobtemplate-2023-09", + "name": "template", + "steps": [ + { + "name": "step1", + "script": {"actions": {"onRun": {"command": 'echo "We can have lots of fun"'}}}, + "stepEnvironments": [{"name": "step1Env", "variables": {"variable": "value"}}], + }, + { + "name": "step2", + "script": { + "actions": {"onRun": {"command": 'echo "There\'s so much we can do"'}} }, - ], - }, - id="Environments in multiple steps", - ), - ], + "stepEnvironments": [{"name": "step2Env", "variables": {"variable": "value"}}], + }, + ], + }, + id="Environments in multiple steps", + ), +) + + +@pytest.mark.parametrize( + "mock_params,expected_name,expected_params,expected_steps,expected_total_tasks,expected_total_envs,expected_root_envs,template_dict", + PARAMETRIZE_CASES, ) def test_output_job_summary_result_success( mock_params: JobParameterValues, @@ -452,7 +445,7 @@ def test_output_job_summary_result_success( """ Test that `output_summary_result` returns an object with the expected values when called on a Job. """ - template = decode_job_template(template=template_dict) + template = decode_job_template(template=template_dict, supported_extensions=[]) job = create_job(job_template=template, job_parameter_values=mock_params) response = output_summary_result(job) diff --git a/test/openjd/test_main.py b/test/openjd/test_main.py index 77004fa..fd95e5a 100644 --- a/test/openjd/test_main.py +++ b/test/openjd/test_main.py @@ -107,7 +107,7 @@ def test_cli_summary_success(mock_summary: Mock, mock_args: list): "taskparam=paramvalue", "--run-dependencies", "--path-mapping-rules", - '[{"source_os": "someOS", "source_path": "some\path", "destination_path": "some/new/path"}]', + '[{"source_os": "someOS", "source_path": "some\\path", "destination_path": "some/new/path"}]', "--output", "json", ], @@ -124,7 +124,7 @@ def test_cli_summary_success(mock_summary: Mock, mock_args: list): '[{"taskparam": "paramvalue"}]', "--run-dependencies", "--path-mapping-rules", - '[{"source_os": "someOS", "source_path": "some\path", "destination_path": "some/new/path"}]', + '[{"source_os": "someOS", "source_path": "some\\path", "destination_path": "some/new/path"}]', "--output", "json", ], @@ -141,7 +141,7 @@ def test_cli_summary_success(mock_summary: Mock, mock_args: list): "1", "--run-dependencies", "--path-mapping-rules", - '[{"source_os": "someOS", "source_path": "some\path", "destination_path": "some/new/path"}]', + '[{"source_os": "someOS", "source_path": "some\\path", "destination_path": "some/new/path"}]', "--output", "json", ],