diff --git a/docs/snippets/static02.py b/docs/snippets/static02.py index 034ddbd62..376777993 100644 --- a/docs/snippets/static02.py +++ b/docs/snippets/static02.py @@ -7,4 +7,4 @@ class TemperatureController(Controller): fastcs = FastCS(TemperatureController(), []) -fastcs.run() +# fastcs.run() # Commented as this will block diff --git a/docs/snippets/static03.py b/docs/snippets/static03.py index 5775d01ca..57d210c07 100644 --- a/docs/snippets/static03.py +++ b/docs/snippets/static03.py @@ -9,4 +9,4 @@ class TemperatureController(Controller): fastcs = FastCS(TemperatureController(), []) -fastcs.run() +# fastcs.run() # Commented as this will block diff --git a/pyproject.toml b/pyproject.toml index 53af0c339..0fe7c5b46 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -20,7 +20,8 @@ dependencies = [ "pytango", "softioc>=4.5.0", "strawberry-graphql", - "p4p" + "p4p", + "IPython", ] dynamic = ["version"] license.file = "LICENSE" diff --git a/src/fastcs/launch.py b/src/fastcs/launch.py index de2c8dad9..a96fdaa45 100644 --- a/src/fastcs/launch.py +++ b/src/fastcs/launch.py @@ -2,10 +2,13 @@ import inspect import json import signal +from collections.abc import Coroutine +from functools import partial from pathlib import Path from typing import Annotated, Any, Optional, TypeAlias, get_type_hints import typer +from IPython.terminal.embed import InteractiveShellEmbed from pydantic import BaseModel, create_model from ruamel.yaml import YAML @@ -36,6 +39,7 @@ def __init__( transport_options: TransportOptions, ): self._loop = asyncio.get_event_loop() + self._controller = controller self._backend = Backend(controller, self._loop) transport: TransportAdapter self._transports: list[TransportAdapter] = [] @@ -100,12 +104,59 @@ def run(self): async def serve(self) -> None: coros = [self._backend.serve()] - coros.extend([transport.serve() for transport in self._transports]) + context = { + "controller": self._controller, + "controller_api": self._backend.controller_api, + "transports": [ + transport.__class__.__name__ for transport in self._transports + ], + } + + for transport in self._transports: + coros.append(transport.serve()) + common_context = context.keys() & transport.context.keys() + if common_context: + raise RuntimeError( + "Duplicate context keys found between " + f"current context { ({k: context[k] for k in common_context}) } " + f"and {transport.__class__.__name__} context: " + f"{ ({k: transport.context[k] for k in common_context}) }" + ) + context.update(transport.context) + + coros.append(self._interactive_shell(context)) + try: await asyncio.gather(*coros) except asyncio.CancelledError: pass + async def _interactive_shell(self, context: dict[str, Any]): + """Spawn interactive shell in another thread and wait for it to complete.""" + + def run(coro: Coroutine[None, None, None]): + """Run coroutine on FastCS event loop from IPython thread.""" + + def wrapper(): + asyncio.create_task(coro) + + self._loop.call_soon_threadsafe(wrapper) + + async def interactive_shell( + context: dict[str, object], stop_event: asyncio.Event + ): + """Run interactive shell in a new thread.""" + shell = InteractiveShellEmbed() + await asyncio.to_thread(partial(shell.mainloop, local_ns=context)) + + stop_event.set() + + context["run"] = run + + stop_event = asyncio.Event() + self._loop.create_task(interactive_shell(context, stop_event)) + await stop_event.wait() + def launch( controller_class: type[Controller], diff --git a/src/fastcs/transport/adapter.py b/src/fastcs/transport/adapter.py index c3ed79143..c872265da 100644 --- a/src/fastcs/transport/adapter.py +++ b/src/fastcs/transport/adapter.py @@ -22,3 +22,7 @@ def create_docs(self) -> None: @abstractmethod def create_gui(self) -> None: pass + + @property + def context(self) -> dict[str, Any]: + return {} diff --git a/src/fastcs/transport/epics/ca/adapter.py b/src/fastcs/transport/epics/ca/adapter.py index f2b001cbe..50601a95c 100644 --- a/src/fastcs/transport/epics/ca/adapter.py +++ b/src/fastcs/transport/epics/ca/adapter.py @@ -40,5 +40,3 @@ def create_gui(self) -> None: async def serve(self) -> None: print(f"Running FastCS IOC: {self._pv_prefix}") self._ioc.run(self._loop) - while True: - await asyncio.sleep(1) diff --git a/tests/example_p4p_ioc.py b/tests/example_p4p_ioc.py index d7ce94745..0736f782f 100644 --- a/tests/example_p4p_ioc.py +++ b/tests/example_p4p_ioc.py @@ -46,6 +46,7 @@ async def d(self): print("D: RUNNING") await asyncio.sleep(0.1) print("D: FINISHED") + await self.j.set(self.j.get() + 1) e: AttrR = AttrR(Bool()) @@ -67,6 +68,9 @@ async def i(self): else: self.fail_on_next_e = True print("I: FINISHED") + await self.j.set(self.j.get() + 1) + + j: AttrR = AttrR(Int()) def run(pv_prefix="P4P_TEST_DEVICE"): diff --git a/tests/test_launch.py b/tests/test_launch.py index cad6041cb..9b1aa4b4f 100644 --- a/tests/test_launch.py +++ b/tests/test_launch.py @@ -145,3 +145,17 @@ def test_get_schema(data): ref_schema = YAML(typ="safe").load(data / "schema.json") assert target_schema == ref_schema + + +def test_error_if_identical_context_in_transports(mocker: MockerFixture, data): + mocker.patch("fastcs.launch.FastCS.create_gui") + mocker.patch("fastcs.launch.FastCS.create_docs") + mocker.patch( + "fastcs.transport.adapter.TransportAdapter.context", + new_callable=mocker.PropertyMock, + return_value={"controller": "test"}, + ) + app = _launch(IsHinted) + result = runner.invoke(app, ["run", str(data / "config.yaml")]) + assert isinstance(result.exception, RuntimeError) + assert "Duplicate context keys found" in result.exception.args[0] diff --git a/tests/transport/epics/pva/test_p4p.py b/tests/transport/epics/pva/test_p4p.py index 16479c199..12460613e 100644 --- a/tests/transport/epics/pva/test_p4p.py +++ b/tests/transport/epics/pva/test_p4p.py @@ -60,6 +60,7 @@ async def test_ioc(p4p_subprocess: tuple[str, Queue]): "g": {"rw": f"{pv_prefix}:Child1:G"}, "h": {"rw": f"{pv_prefix}:Child1:H"}, "i": {"x": f"{pv_prefix}:Child1:I"}, + "j": {"r": f"{pv_prefix}:Child1:J"}, } @@ -104,31 +105,29 @@ async def test_scan_method(p4p_subprocess: tuple[str, Queue]): @pytest.mark.asyncio async def test_command_method(p4p_subprocess: tuple[str, Queue]): - QUEUE_TIMEOUT = 1 - pv_prefix, stdout_queue = p4p_subprocess + pv_prefix, _ = p4p_subprocess d_values = asyncio.Queue() i_values = asyncio.Queue() + j_values = asyncio.Queue() ctxt = Context("pva") d_monitor = ctxt.monitor(f"{pv_prefix}:Child1:D", d_values.put) i_monitor = ctxt.monitor(f"{pv_prefix}:Child1:I", i_values.put) + j_monitor = ctxt.monitor(f"{pv_prefix}:Child1:J", j_values.put) try: - if not stdout_queue.empty(): - raise RuntimeError("stdout_queue not empty", stdout_queue.get()) + j_initial_value = await j_values.get() assert (await d_values.get()).raw.value is False await ctxt.put(f"{pv_prefix}:Child1:D", True) assert (await d_values.get()).raw.value is True + # D process hangs for 0.1s, so we wait slightly longer await asyncio.sleep(0.2) + # Value returns to False, signifying completed process assert (await d_values.get()).raw.value is False - - assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "D: RUNNING" - assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n" - assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "D: FINISHED" - assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n" + # D process increments J by 1 + assert (await j_values.get()).raw.value == j_initial_value + 1 # First run fails - assert stdout_queue.empty() before_command_value = (await i_values.get()).raw assert before_command_value["value"] is False assert before_command_value["alarm"]["severity"] == 0 @@ -143,30 +142,26 @@ async def test_command_method(p4p_subprocess: tuple[str, Queue]): assert ( after_command_value["alarm"]["message"] == "I: FAILED WITH THIS WEIRD ERROR" ) - assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "I: RUNNING" - assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n" + # Failed I process does not increment J + assert j_values.empty() # Second run succeeds - assert stdout_queue.empty() await ctxt.put(f"{pv_prefix}:Child1:I", True) assert (await i_values.get()).raw.value is True await asyncio.sleep(0.2) after_command_value = (await i_values.get()).raw + # Successful I process increments J by 1 + assert (await j_values.get()).raw.value == j_initial_value + 2 # On the second run the command succeeded so we left the error state assert after_command_value["value"] is False assert after_command_value["alarm"]["severity"] == 0 assert after_command_value["alarm"]["message"] == "" - assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "I: RUNNING" - assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n" - assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "I: FINISHED" - assert stdout_queue.get(timeout=QUEUE_TIMEOUT) == "\n" - assert stdout_queue.empty() - finally: d_monitor.close() i_monitor.close() + j_monitor.close() @pytest.mark.asyncio