Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions pyiron_workflow/executors/wrapped_executorlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,35 @@ class NodeSlurmExecutor(CacheOverride, SlurmClusterExecutor): ...


class _CacheTestClusterExecutor(CacheOverride, TestClusterExecutor): ...


extra_info = """
This is a wrapper around executorlib's executors that is designed to only work with
the submission of :mod:`pyiron_workflow` node calculations, and manipulates the
:mod:`executorlib` caching information to exploit the lexical path of the node.
"""


def wrapped_executorlib_class(wrapped_class: type[BaseExecutor]):
base_doc = wrapped_class.__doc__ or ""
return type(
"Wrapped" + wrapped_class.__name__,
(
CacheOverride,
wrapped_class,
),
{
"__doc__": base_doc + extra_info,
"__module__": wrapped_class.__module__,
},
)


def wrapped_executorlib(
wrapped_class: type[BaseExecutor],
*,
resource_dict: dict[str, Any] | None = None,
**kwargs,
):
kwargs.update({"resource_dict": resource_dict})
return wrapped_executorlib_class(wrapped_class)(**kwargs)
13 changes: 11 additions & 2 deletions tests/integration/test_wrapped_executorlib.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import time
import unittest

import executorlib
from executorlib import api

import pyiron_workflow as pwf
from pyiron_workflow.executors.wrapped_executorlib import (
NodeSingleExecutor,
_CacheTestClusterExecutor,
wrapped_executorlib,
wrapped_executorlib_class,
)


Expand Down Expand Up @@ -81,12 +86,16 @@ def _test_cache(self, executor_class):
)

def test_cache(self):
for executor_class in [NodeSingleExecutor, _CacheTestClusterExecutor]:
for executor_class in [
NodeSingleExecutor,
_CacheTestClusterExecutor,
wrapped_executorlib_class(executorlib.SingleNodeExecutor),
]:
with self.subTest(executor_class.__name__):
self._test_cache(executor_class)

def test_automatic_cleaning(self):
n = pwf.std.UserInput(1)
n.executor = (_CacheTestClusterExecutor, (), {})
n.executor = (wrapped_executorlib, (api.TestClusterExecutor,), {})
n.run()
self.assertFalse(n._wrapped_executorlib_cache_file.is_file())
Loading