Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
TMPDIR=$(mktemp -d)
git clone https://github.com/LLNL/clippy-cpp --branch ${{ github.head_ref }} $TMPDIR || git clone https://github.com/LLNL/clippy-cpp --branch master $TMPDIR
mkdir -p $TMPDIR/build
cd $TMPDIR/build && cmake -DBOOST_ROOT=$BOOST_ROOT .. && make
cd $TMPDIR/build && cmake -DMODERN_CMAKE_BUILD_TESTING=ON -DBUILD_TESTING=ON -DBOOST_ROOT=$BOOST_ROOT .. && make
ls -l $TMPDIR/build/test
BACKEND=$TMPDIR/build/test
echo "BACKEND=$BACKEND" >> $GITHUB_ENV
Expand Down
1 change: 0 additions & 1 deletion requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ mypy
pylint>=2.15,<3
pyproj>=3.6,<4
pytest>=7,<8
tqdm
json-logic-qubit
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
semver >= 3.0
jsonlogic-py >= 0.1
jsonlogic-py >= 0.2
4 changes: 0 additions & 4 deletions src/clippy/backends/fs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ def _process_executable(executable: str, cls):
unordered_descs = []
for arg, arginfo in sorted(args.items(), key=lambda x: x[1]["position"]):
desc = f"{arg}: {arginfo['desc']}"
print(f"in arg loop, {desc=}")
if arginfo["position"] != -1:
ordered_descs.append(desc)
else:
Expand All @@ -181,7 +180,6 @@ def _process_executable(executable: str, cls):
cls.logger.warning(
f"Overwriting existing method {method} for class {cls} with executable {executable}"
)
print(f"FINAL docstring for {method} {docstring=}")
_define_method(cls, method, executable, docstring, args)
return cls

Expand All @@ -193,8 +191,6 @@ def _define_method(
if arguments is None:
arguments = dict()

print(f" in define_method, {arguments=}")

def m(self, *args, **kwargs):
"""
Generic Method that calls an executable with specified arguments
Expand Down
142 changes: 84 additions & 58 deletions src/clippy/backends/fs/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,23 @@
from __future__ import annotations
import json
import logging
import select
import os

import subprocess
from ...clippy_types import AnyDict
from ... import cfg
from ...constants import OUTPUT_KEY
from .constants import (
DRY_RUN_FLAG,
HELP_FLAG,
PROGRESS_INC_KEY,
PROGRESS_SET_KEY,
PROGRESS_START_KEY,
PROGRESS_END_KEY,
)

from ...error import ClippyValidationError, ClippyBackendError
from ..serialization import encode_clippy_json, decode_clippy_json

try:
from tqdm import tqdm
from .constants import DRY_RUN_FLAG, HELP_FLAG

_has_tqdm = True
except ImportError:
_has_tqdm = False
from ..serialization import encode_clippy_json, decode_clippy_json


def _stream_exec(
cmd: list[str],
submission_dict: AnyDict,
logger: logging.Logger,
validate: bool,
) -> tuple[AnyDict | None, str | None]:
) -> tuple[AnyDict | None, str | None, int]:
"""
Internal function.

Expand All @@ -44,7 +30,7 @@ def _stream_exec(

Logs debug messages with progress.
Parses the object and returns a dictionary output.
Returns the process result object.
Returns the process result object, stderr, and the process return code.

This function is used by _run and _validate. All options (pre_cmd and flags) should
already be set.
Expand All @@ -58,7 +44,8 @@ def _stream_exec(
logger.debug("Calling %s with input %s", cmd, cmd_stdin)

d = {}
stderr = None
stderr_lines = []

with subprocess.Popen(
cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf8'
) as proc:
Expand All @@ -68,48 +55,85 @@ def _stream_exec(

proc.stdin.write(cmd_stdin + "\n")
proc.stdin.flush()
# proc.stdin.close()
it = iter(proc.stdout.readline, '')
proc.stdin.close()

progress = None
for line in it:
d = json.loads(line, object_hook=decode_clippy_json)
if proc.poll() is not None: # process terminated; this shouldn't normally happen.
break
if _has_tqdm:
if progress is None:
if PROGRESS_START_KEY in d:
progress = tqdm() if d[PROGRESS_START_KEY] is None else tqdm(total=d[PROGRESS_START_KEY])
# print(f"start, total = {d[PROGRESS_START_KEY]}, {progress.n=}")
else:
if PROGRESS_INC_KEY in d:
progress.update(d[PROGRESS_INC_KEY])
progress.refresh()
# print(f"update {progress.n=}")
if PROGRESS_SET_KEY in d:
progress.n = d[PROGRESS_SET_KEY]
progress.refresh()
if PROGRESS_END_KEY in d:
progress.close()
# print("close")
progress = None
if progress is None:
if OUTPUT_KEY in d:
print(d[OUTPUT_KEY])

if proc.stderr is not None:
stderr = "".join(proc.stderr.readlines())
# Use select with file descriptors and non-blocking reads
stdout_fd = proc.stdout.fileno()
stderr_fd = proc.stderr.fileno()
fds = [stdout_fd, stderr_fd]

# Set non-blocking mode
os.set_blocking(stdout_fd, False)
os.set_blocking(stderr_fd, False)

# Line buffers for partial reads
stdout_buffer = ""
stderr_buffer = ""

while fds or proc.poll() is None:
# Only select if we have fds to monitor
if fds:
readable, _, _ = select.select(fds, [], [], 0.1)
else:
readable = []

for fd in readable:
try:
chunk = os.read(fd, 4096)
if not chunk:
# Stream closed (EOF)
fds.remove(fd)
continue

text = chunk.decode("utf-8")

if fd == stdout_fd:
stdout_buffer += text
while "\n" in stdout_buffer:
line, stdout_buffer = stdout_buffer.split("\n", 1)
d = json.loads(line, object_hook=decode_clippy_json)
elif fd == stderr_fd:
stderr_buffer += text
while "\n" in stderr_buffer:
line, stderr_buffer = stderr_buffer.split("\n", 1)
stderr_lines.append(line + "\n")
print(line, flush=True)
except BlockingIOError:
# No data available right now
continue
except OSError:
# Stream closed
if fd in fds:
fds.remove(fd)
continue

# Process any remaining buffered data
if stdout_buffer.strip():
try:
d = json.loads(stdout_buffer, object_hook=decode_clippy_json)
except json.JSONDecodeError:
pass

if stderr_buffer.strip():
stderr_lines.append(stderr_buffer)
print(stderr_buffer.rstrip(), flush=True)

stderr = "".join(stderr_lines) if stderr_lines else None
if progress is not None:
progress.close()
if proc.returncode:
raise (ClippyValidationError(stderr) if validate else ClippyBackendError(stderr))
# if proc.returncode:
# raise (ClippyValidationError(stderr) if validate else ClippyBackendError(stderr))

if not d:
return None, stderr
return None, stderr, proc.returncode
if stderr:
logger.debug('Received stderr: %s', stderr)
if proc.returncode != 0:
logger.debug("Process returned %d", proc.returncode)
logger.debug('run(): final stdout = %s', d)

return (d, stderr)
return (d, stderr, proc.returncode)


def _validate(
Expand All @@ -126,8 +150,8 @@ def _validate(
execcmd = cfg.get('validate_cmd_prefix').split() + cmd + [DRY_RUN_FLAG]
logger.debug("Validating %s", cmd)

_, stderr = _stream_exec(execcmd, dct, logger, validate=True)
return stderr is not None, stderr or ''
_, stderr, retcode = _stream_exec(execcmd, dct, logger, validate=True)
return retcode == 0, stderr or ""


def _run(cmd: str | list[str], dct: AnyDict, logger: logging.Logger) -> AnyDict:
Expand All @@ -142,7 +166,9 @@ def _run(cmd: str | list[str], dct: AnyDict, logger: logging.Logger) -> AnyDict:
logger.debug('Running %s', execcmd)
# should we do something with stderr?

output, _ = _stream_exec(execcmd, dct, logger, validate=False)
output, _, retcode = _stream_exec(execcmd, dct, logger, validate=False)
if retcode != 0:
logger.warning("Process returned non-zero return code: %d", retcode)
return output or {}


Expand All @@ -158,5 +184,5 @@ def _help(cmd: str | list[str], dct: AnyDict, logger: logging.Logger) -> AnyDict
logger.debug('Running %s', execcmd)
# should we do something with stderr?

output, _ = _stream_exec(execcmd, dct, logger, validate=True)
output, _, _ = _stream_exec(execcmd, dct, logger, validate=True)
return output or {}
10 changes: 5 additions & 5 deletions src/clippy/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@

_clippy_cfg: dict[str, CONFIG_ENTRY] = {
# backends to use for Clippy.
"backends": ("CLIPPY_BACKENDS", ['fs']),
"backends": ("CLIPPY_BACKENDS", ["fs"]),
# semver version restrictions for the backend
"required_versions": ("CLIPPY_REQ_VERSIONS", '>=0.2.0, <0.3.0'),
"required_versions": ("CLIPPY_REQ_VERSIONS", ">=0.2.0"),
# command prefix used to specify clippy task management with the HPC cluster
# for instance, if using slurm this could be set to 'srun -n1 -ppdebug'
"cmd_prefix": ("CLIPPY_CMD_PREFIX", ''),
"cmd_prefix": ("CLIPPY_CMD_PREFIX", ""),
# command prefix used to specify clippy task management with the HPC cluster
# for dry runs in certain environments. For instance, if using slurm this
# could be set to 'srun -n1 -ppdebug'
"validate_cmd_prefix": ("CLIPPY_VALIDATE_CMD_PREFIX", ''),
"validate_cmd_prefix": ("CLIPPY_VALIDATE_CMD_PREFIX", ""),
# contol the log level of clippy
"loglevel": ("CLIPPY_LOGLEVEL", logging.WARNING),
"logformat": (
"CLIPPY_LOGFORMAT",
'%(asctime)s [%(filename)s:%(lineno)d (%(funcName)s) %(levelname)s: %(message)s',
"%(asctime)s [%(filename)s:%(lineno)d (%(funcName)s) %(levelname)s: %(message)s",
),
"logname": ("CLIPPY_LOGNAME", __name__),
}
Loading