diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 5caf68c..5822106 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -56,7 +56,7 @@ jobs: TMPDIR=$(mktemp -d) git clone https://github.com/LLNL/clippy-cpp --branch ${{ github.head_ref }} $TMPDIR || git clone https://github.com/LLNL/clippy-cpp --branch master $TMPDIR mkdir -p $TMPDIR/build - cd $TMPDIR/build && cmake -DBOOST_ROOT=$BOOST_ROOT .. && make + cd $TMPDIR/build && cmake -DMODERN_CMAKE_BUILD_TESTING=ON -DBUILD_TESTING=ON -DBOOST_ROOT=$BOOST_ROOT .. && make ls -l $TMPDIR/build/test BACKEND=$TMPDIR/build/test echo "BACKEND=$BACKEND" >> $GITHUB_ENV diff --git a/requirements-dev.txt b/requirements-dev.txt index 53c4d33..4cc3f4a 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -4,5 +4,4 @@ mypy pylint>=2.15,<3 pyproj>=3.6,<4 pytest>=7,<8 -tqdm json-logic-qubit diff --git a/requirements.txt b/requirements.txt index efbfc78..2b5133f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ semver >= 3.0 -jsonlogic-py >= 0.1 +jsonlogic-py >= 0.2 diff --git a/src/clippy/backends/fs/__init__.py b/src/clippy/backends/fs/__init__.py index 00b6147..0aa959e 100644 --- a/src/clippy/backends/fs/__init__.py +++ b/src/clippy/backends/fs/__init__.py @@ -158,7 +158,6 @@ def _process_executable(executable: str, cls): unordered_descs = [] for arg, arginfo in sorted(args.items(), key=lambda x: x[1]["position"]): desc = f"{arg}: {arginfo['desc']}" - print(f"in arg loop, {desc=}") if arginfo["position"] != -1: ordered_descs.append(desc) else: @@ -181,7 +180,6 @@ def _process_executable(executable: str, cls): cls.logger.warning( f"Overwriting existing method {method} for class {cls} with executable {executable}" ) - print(f"FINAL docstring for {method} {docstring=}") _define_method(cls, method, executable, docstring, args) return cls @@ -193,8 +191,6 @@ def _define_method( if arguments is None: arguments = dict() - print(f" in define_method, {arguments=}") - def m(self, *args, **kwargs): """ Generic Method that calls an executable with specified arguments diff --git a/src/clippy/backends/fs/execution.py b/src/clippy/backends/fs/execution.py index 56e7ad2..60523b6 100644 --- a/src/clippy/backends/fs/execution.py +++ b/src/clippy/backends/fs/execution.py @@ -5,29 +5,15 @@ from __future__ import annotations import json import logging +import select +import os import subprocess from ...clippy_types import AnyDict from ... import cfg -from ...constants import OUTPUT_KEY -from .constants import ( - DRY_RUN_FLAG, - HELP_FLAG, - PROGRESS_INC_KEY, - PROGRESS_SET_KEY, - PROGRESS_START_KEY, - PROGRESS_END_KEY, -) - -from ...error import ClippyValidationError, ClippyBackendError -from ..serialization import encode_clippy_json, decode_clippy_json - -try: - from tqdm import tqdm +from .constants import DRY_RUN_FLAG, HELP_FLAG - _has_tqdm = True -except ImportError: - _has_tqdm = False +from ..serialization import encode_clippy_json, decode_clippy_json def _stream_exec( @@ -35,7 +21,7 @@ def _stream_exec( submission_dict: AnyDict, logger: logging.Logger, validate: bool, -) -> tuple[AnyDict | None, str | None]: +) -> tuple[AnyDict | None, str | None, int]: """ Internal function. @@ -44,7 +30,7 @@ def _stream_exec( Logs debug messages with progress. Parses the object and returns a dictionary output. - Returns the process result object. + Returns the process result object, stderr, and the process return code. This function is used by _run and _validate. All options (pre_cmd and flags) should already be set. @@ -58,7 +44,8 @@ def _stream_exec( logger.debug("Calling %s with input %s", cmd, cmd_stdin) d = {} - stderr = None + stderr_lines = [] + with subprocess.Popen( cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf8' ) as proc: @@ -68,48 +55,85 @@ def _stream_exec( proc.stdin.write(cmd_stdin + "\n") proc.stdin.flush() - # proc.stdin.close() - it = iter(proc.stdout.readline, '') + proc.stdin.close() + progress = None - for line in it: - d = json.loads(line, object_hook=decode_clippy_json) - if proc.poll() is not None: # process terminated; this shouldn't normally happen. - break - if _has_tqdm: - if progress is None: - if PROGRESS_START_KEY in d: - progress = tqdm() if d[PROGRESS_START_KEY] is None else tqdm(total=d[PROGRESS_START_KEY]) - # print(f"start, total = {d[PROGRESS_START_KEY]}, {progress.n=}") - else: - if PROGRESS_INC_KEY in d: - progress.update(d[PROGRESS_INC_KEY]) - progress.refresh() - # print(f"update {progress.n=}") - if PROGRESS_SET_KEY in d: - progress.n = d[PROGRESS_SET_KEY] - progress.refresh() - if PROGRESS_END_KEY in d: - progress.close() - # print("close") - progress = None - if progress is None: - if OUTPUT_KEY in d: - print(d[OUTPUT_KEY]) - - if proc.stderr is not None: - stderr = "".join(proc.stderr.readlines()) + # Use select with file descriptors and non-blocking reads + stdout_fd = proc.stdout.fileno() + stderr_fd = proc.stderr.fileno() + fds = [stdout_fd, stderr_fd] + + # Set non-blocking mode + os.set_blocking(stdout_fd, False) + os.set_blocking(stderr_fd, False) + + # Line buffers for partial reads + stdout_buffer = "" + stderr_buffer = "" + + while fds or proc.poll() is None: + # Only select if we have fds to monitor + if fds: + readable, _, _ = select.select(fds, [], [], 0.1) + else: + readable = [] + + for fd in readable: + try: + chunk = os.read(fd, 4096) + if not chunk: + # Stream closed (EOF) + fds.remove(fd) + continue + + text = chunk.decode("utf-8") + + if fd == stdout_fd: + stdout_buffer += text + while "\n" in stdout_buffer: + line, stdout_buffer = stdout_buffer.split("\n", 1) + d = json.loads(line, object_hook=decode_clippy_json) + elif fd == stderr_fd: + stderr_buffer += text + while "\n" in stderr_buffer: + line, stderr_buffer = stderr_buffer.split("\n", 1) + stderr_lines.append(line + "\n") + print(line, flush=True) + except BlockingIOError: + # No data available right now + continue + except OSError: + # Stream closed + if fd in fds: + fds.remove(fd) + continue + + # Process any remaining buffered data + if stdout_buffer.strip(): + try: + d = json.loads(stdout_buffer, object_hook=decode_clippy_json) + except json.JSONDecodeError: + pass + + if stderr_buffer.strip(): + stderr_lines.append(stderr_buffer) + print(stderr_buffer.rstrip(), flush=True) + + stderr = "".join(stderr_lines) if stderr_lines else None if progress is not None: progress.close() - if proc.returncode: - raise (ClippyValidationError(stderr) if validate else ClippyBackendError(stderr)) + # if proc.returncode: + # raise (ClippyValidationError(stderr) if validate else ClippyBackendError(stderr)) if not d: - return None, stderr + return None, stderr, proc.returncode if stderr: logger.debug('Received stderr: %s', stderr) + if proc.returncode != 0: + logger.debug("Process returned %d", proc.returncode) logger.debug('run(): final stdout = %s', d) - return (d, stderr) + return (d, stderr, proc.returncode) def _validate( @@ -126,8 +150,8 @@ def _validate( execcmd = cfg.get('validate_cmd_prefix').split() + cmd + [DRY_RUN_FLAG] logger.debug("Validating %s", cmd) - _, stderr = _stream_exec(execcmd, dct, logger, validate=True) - return stderr is not None, stderr or '' + _, stderr, retcode = _stream_exec(execcmd, dct, logger, validate=True) + return retcode == 0, stderr or "" def _run(cmd: str | list[str], dct: AnyDict, logger: logging.Logger) -> AnyDict: @@ -142,7 +166,9 @@ def _run(cmd: str | list[str], dct: AnyDict, logger: logging.Logger) -> AnyDict: logger.debug('Running %s', execcmd) # should we do something with stderr? - output, _ = _stream_exec(execcmd, dct, logger, validate=False) + output, _, retcode = _stream_exec(execcmd, dct, logger, validate=False) + if retcode != 0: + logger.warning("Process returned non-zero return code: %d", retcode) return output or {} @@ -158,5 +184,5 @@ def _help(cmd: str | list[str], dct: AnyDict, logger: logging.Logger) -> AnyDict logger.debug('Running %s', execcmd) # should we do something with stderr? - output, _ = _stream_exec(execcmd, dct, logger, validate=True) + output, _, _ = _stream_exec(execcmd, dct, logger, validate=True) return output or {} diff --git a/src/clippy/config.py b/src/clippy/config.py index 4091e30..0cb6567 100644 --- a/src/clippy/config.py +++ b/src/clippy/config.py @@ -8,21 +8,21 @@ _clippy_cfg: dict[str, CONFIG_ENTRY] = { # backends to use for Clippy. - "backends": ("CLIPPY_BACKENDS", ['fs']), + "backends": ("CLIPPY_BACKENDS", ["fs"]), # semver version restrictions for the backend - "required_versions": ("CLIPPY_REQ_VERSIONS", '>=0.2.0, <0.3.0'), + "required_versions": ("CLIPPY_REQ_VERSIONS", ">=0.2.0"), # command prefix used to specify clippy task management with the HPC cluster # for instance, if using slurm this could be set to 'srun -n1 -ppdebug' - "cmd_prefix": ("CLIPPY_CMD_PREFIX", ''), + "cmd_prefix": ("CLIPPY_CMD_PREFIX", ""), # command prefix used to specify clippy task management with the HPC cluster # for dry runs in certain environments. For instance, if using slurm this # could be set to 'srun -n1 -ppdebug' - "validate_cmd_prefix": ("CLIPPY_VALIDATE_CMD_PREFIX", ''), + "validate_cmd_prefix": ("CLIPPY_VALIDATE_CMD_PREFIX", ""), # contol the log level of clippy "loglevel": ("CLIPPY_LOGLEVEL", logging.WARNING), "logformat": ( "CLIPPY_LOGFORMAT", - '%(asctime)s [%(filename)s:%(lineno)d (%(funcName)s) %(levelname)s: %(message)s', + "%(asctime)s [%(filename)s:%(lineno)d (%(funcName)s) %(levelname)s: %(message)s", ), "logname": ("CLIPPY_LOGNAME", __name__), }