diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 06cc35349..b15038b09 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -49,19 +49,19 @@ jobs: timeout-minutes: 15 if: ${{ !startsWith( matrix.python-version, 'pypy' ) && !startsWith(matrix.os, 'windows') }} run: | - hatch run cov:test --cov-fail-under 50 || hatch run test:test --lf + hatch run cov:test --cov-fail-under 50 - name: Run the tests on pypy timeout-minutes: 15 if: ${{ startsWith( matrix.python-version, 'pypy' ) }} run: | - hatch run test:nowarn || hatch run test:nowarn --lf + hatch run test:nowarn - name: Run the tests on Windows timeout-minutes: 15 if: ${{ startsWith(matrix.os, 'windows') }} run: | - hatch run cov:nowarn || hatch run test:nowarn --lf + hatch run cov:nowarn - name: Check Launcher run: | @@ -144,7 +144,7 @@ jobs: - name: Run the tests timeout-minutes: 15 - run: pytest -W default -vv || pytest --vv -W default --lf + run: pytest -W default -vv test_miniumum_versions: name: Test Minimum Versions @@ -164,7 +164,7 @@ jobs: - name: Run the unit tests run: | - hatch -v run test:nowarn || hatch run test:nowarn --lf + hatch -v run test:nowarn test_prereleases: name: Test Prereleases @@ -179,7 +179,7 @@ jobs: dependency_type: pre - name: Run the tests run: | - hatch run test:nowarn || hatch run test:nowarn --lf + hatch run test:nowarn make_sdist: name: Make SDist diff --git a/.github/workflows/downstream.yml b/.github/workflows/downstream.yml index b47c0f06d..42ea51994 100644 --- a/.github/workflows/downstream.yml +++ b/.github/workflows/downstream.yml @@ -104,7 +104,7 @@ jobs: shell: bash -l {0} run: | cd ${GITHUB_WORKSPACE}/.. - git clone https://github.com/jupyter/qtconsole.git + git clone https://github.com/spyder-ide/qtconsole.git cd qtconsole ${pythonLocation}/bin/python -m pip install -e ".[test]" ${pythonLocation}/bin/python -m pip install pyqt5 diff --git a/.readthedocs.yaml b/.readthedocs.yaml index 7ab2c8bf0..8e17caad0 100644 --- a/.readthedocs.yaml +++ b/.readthedocs.yaml @@ -3,7 +3,7 @@ version: 2 build: os: ubuntu-22.04 tools: - python: "3.11" + python: "3.13" sphinx: configuration: docs/conf.py diff --git a/docs/api/ipykernel.comm.rst b/docs/api/ipykernel.comm.rst index a2d529ed0..1cf9ee4e7 100644 --- a/docs/api/ipykernel.comm.rst +++ b/docs/api/ipykernel.comm.rst @@ -7,19 +7,19 @@ Submodules .. automodule:: ipykernel.comm.comm :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.comm.manager :members: - :show-inheritance: :undoc-members: + :show-inheritance: Module contents --------------- .. automodule:: ipykernel.comm :members: - :show-inheritance: :undoc-members: + :show-inheritance: diff --git a/docs/api/ipykernel.inprocess.rst b/docs/api/ipykernel.inprocess.rst index 24d62e7ad..c2d6536bc 100644 --- a/docs/api/ipykernel.inprocess.rst +++ b/docs/api/ipykernel.inprocess.rst @@ -7,49 +7,49 @@ Submodules .. automodule:: ipykernel.inprocess.blocking :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.inprocess.channels :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.inprocess.client :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.inprocess.constants :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.inprocess.ipkernel :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.inprocess.manager :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.inprocess.socket :members: - :show-inheritance: :undoc-members: + :show-inheritance: Module contents --------------- .. automodule:: ipykernel.inprocess :members: - :show-inheritance: :undoc-members: + :show-inheritance: diff --git a/docs/api/ipykernel.rst b/docs/api/ipykernel.rst index 1a3286c8a..0f023070d 100644 --- a/docs/api/ipykernel.rst +++ b/docs/api/ipykernel.rst @@ -16,115 +16,145 @@ Submodules .. automodule:: ipykernel.compiler :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.connect :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.control :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.debugger :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.displayhook :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.embed :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.eventloops :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.heartbeat :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.iostream :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.ipkernel :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.jsonutil :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.kernelapp :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.kernelbase :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.kernelspec :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.log :members: - :show-inheritance: :undoc-members: + :show-inheritance: .. automodule:: ipykernel.parentpoller :members: + :undoc-members: :show-inheritance: + + +.. automodule:: ipykernel.shellchannel + :members: :undoc-members: + :show-inheritance: -.. automodule:: ipykernel.trio_runner +.. automodule:: ipykernel.socket_pair :members: + :undoc-members: :show-inheritance: + + +.. automodule:: ipykernel.subshell + :members: :undoc-members: + :show-inheritance: -.. automodule:: ipykernel.zmqshell +.. automodule:: ipykernel.subshell_manager + :members: + :undoc-members: + :show-inheritance: + + +.. automodule:: ipykernel.thread :members: + :undoc-members: :show-inheritance: + + +.. automodule:: ipykernel.trio_runner + :members: :undoc-members: + :show-inheritance: + + +.. automodule:: ipykernel.zmqshell + :members: + :undoc-members: + :show-inheritance: Module contents --------------- .. automodule:: ipykernel :members: - :show-inheritance: :undoc-members: + :show-inheritance: diff --git a/ipykernel/control.py b/ipykernel/control.py index 0ee0fad05..21d6d9962 100644 --- a/ipykernel/control.py +++ b/ipykernel/control.py @@ -1,32 +1,11 @@ """A thread for a control channel.""" -from threading import Thread -from tornado.ioloop import IOLoop +from .thread import CONTROL_THREAD_NAME, BaseThread -CONTROL_THREAD_NAME = "Control" - -class ControlThread(Thread): +class ControlThread(BaseThread): """A thread for a control channel.""" def __init__(self, **kwargs): """Initialize the thread.""" - Thread.__init__(self, name=CONTROL_THREAD_NAME, **kwargs) - self.io_loop = IOLoop(make_current=False) - self.pydev_do_not_trace = True - self.is_pydev_daemon_thread = True - - def run(self): - """Run the thread.""" - self.name = CONTROL_THREAD_NAME - try: - self.io_loop.start() - finally: - self.io_loop.close() - - def stop(self): - """Stop the thread. - - This method is threadsafe. - """ - self.io_loop.add_callback(self.io_loop.stop) + super().__init__(name=CONTROL_THREAD_NAME, **kwargs) diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index ada46eaa2..22ea82880 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -361,6 +361,11 @@ def set_sigint_result(): # restore the previous sigint handler signal.signal(signal.SIGINT, save_sigint) + @contextmanager + def _dummy_context_manager(self, *args): + # Signals only work in main thread, so cannot use _cancel_on_sigint in subshells. + yield + async def execute_request(self, stream, ident, parent): """Override for cell output - cell reconciliation.""" parent_header = extract_header(parent) @@ -439,7 +444,12 @@ async def run_cell(*args, **kwargs): coro_future = asyncio.ensure_future(coro) - with self._cancel_on_sigint(coro_future): + cm = ( + self._cancel_on_sigint + if threading.current_thread() == threading.main_thread() + else self._dummy_context_manager + ) + with cm(coro_future): # type:ignore[operator] res = None try: res = await coro_future diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 097b65aa9..e3136a03c 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -53,6 +53,7 @@ from .iostream import IOPubThread from .ipkernel import IPythonKernel from .parentpoller import ParentPollerUnix, ParentPollerWindows +from .shellchannel import ShellChannelThread from .zmqshell import ZMQInteractiveShell # ----------------------------------------------------------------------------- @@ -143,6 +144,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, ConnectionFileMix iopub_socket = Any() iopub_thread = Any() control_thread = Any() + shell_channel_thread = Any() _ports = Dict() @@ -367,6 +369,11 @@ def init_control(self, context): self.control_socket.router_handover = 1 self.control_thread = ControlThread(daemon=True) + self.shell_channel_thread = ShellChannelThread( + context, + self.shell_socket, + daemon=True, + ) def init_iopub(self, context): """Initialize the iopub channel.""" @@ -406,6 +413,10 @@ def close(self): self.log.debug("Closing control thread") self.control_thread.stop() self.control_thread.join() + if self.shell_channel_thread and self.shell_channel_thread.is_alive(): + self.log.debug("Closing shell channel thread") + self.shell_channel_thread.stop() + self.shell_channel_thread.join() if self.debugpy_socket and not self.debugpy_socket.closed: self.debugpy_socket.close() @@ -546,10 +557,17 @@ def init_signal(self): def init_kernel(self): """Create the Kernel object itself""" - shell_stream = ZMQStream(self.shell_socket) + if self.shell_channel_thread: + shell_stream = ZMQStream(self.shell_socket, self.shell_channel_thread.io_loop) + else: + shell_stream = ZMQStream(self.shell_socket) control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop) debugpy_stream = ZMQStream(self.debugpy_socket, self.control_thread.io_loop) + self.control_thread.start() + if self.shell_channel_thread: + self.shell_channel_thread.start() + kernel_factory = self.kernel_class.instance # type:ignore[attr-defined] kernel = kernel_factory( @@ -560,6 +578,7 @@ def init_kernel(self): debug_shell_socket=self.debug_shell_socket, shell_stream=shell_stream, control_thread=self.control_thread, + shell_channel_thread=self.shell_channel_thread, iopub_thread=self.iopub_thread, iopub_socket=self.iopub_socket, stdin_socket=self.stdin_socket, diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index cb440199e..0c922f2bd 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -20,7 +20,7 @@ from functools import partial from signal import SIGINT, SIGTERM, Signals, default_int_handler, signal -from .control import CONTROL_THREAD_NAME +from .thread import CONTROL_THREAD_NAME if sys.platform != "win32": from signal import SIGKILL @@ -144,6 +144,7 @@ def _shell_streams_changed(self, change): # pragma: no cover debug_shell_socket = Any() control_thread = Any() + shell_channel_thread = Any() iopub_socket = Any() iopub_thread = Any() stdin_socket = Any() @@ -267,6 +268,9 @@ def _parent_header(self): "abort_request", "debug_request", "usage_request", + "create_subshell_request", + "delete_subshell_request", + "list_subshell_request", ] def __init__(self, **kwargs): @@ -346,11 +350,28 @@ def should_handle(self, stream, msg, idents): return False return True - async def dispatch_shell(self, msg): + async def dispatch_shell(self, msg, /, subshell_id: str | None = None): """dispatch shell requests""" + if len(msg) == 1 and msg[0].buffer == b"stop aborting": + # Dummy "stop aborting" message to stop aborting execute requests on this subshell. + # This dummy message implementation allows the subshell to abort messages that are + # already queued in the zmq sockets/streams without having to know any of their + # details in advance. + if subshell_id is None: + self._aborting = False + else: + self.shell_channel_thread.manager.set_subshell_aborting(subshell_id, False) + return + if not self.session: return + if self._supports_kernel_subshells: + assert threading.current_thread() not in ( + self.control_thread, + self.shell_channel_thread, + ) + idents, msg = self.session.feed_identities(msg, copy=False) try: msg = self.session.deserialize(msg, content=True, copy=False) @@ -363,12 +384,25 @@ async def dispatch_shell(self, msg): self._publish_status("busy", "shell") msg_type = msg["header"]["msg_type"] + assert msg["header"].get("subshell_id") == subshell_id + + if self._supports_kernel_subshells: + stream = self.shell_channel_thread.manager.get_subshell_to_shell_channel_socket( + subshell_id + ) + else: + stream = self.shell_stream # Only abort execute requests - if self._aborting and msg_type == "execute_request": - self._send_abort_reply(self.shell_stream, msg, idents) - self._publish_status_and_flush("idle", "shell", self.shell_stream) - return + if msg_type == "execute_request": + if subshell_id is None: + aborting = self._aborting # type:ignore[unreachable] + else: + aborting = self.shell_channel_thread.manager.get_subshell_aborting(subshell_id) + if aborting: + self._send_abort_reply(stream, msg, idents) + self._publish_status_and_flush("idle", "shell", stream) + return # Print some info about this message and leave a '--->' marker, so it's # easier to trace visually the message chain when debugging. Each @@ -376,8 +410,8 @@ async def dispatch_shell(self, msg): self.log.debug("\n*** MESSAGE TYPE:%s***", msg_type) self.log.debug(" Content: %s\n --->\n ", msg["content"]) - if not self.should_handle(self.shell_stream, msg, idents): - self._publish_status_and_flush("idle", "shell", self.shell_stream) + if not self.should_handle(stream, msg, idents): + self._publish_status_and_flush("idle", "shell", stream) return handler = self.shell_handlers.get(msg_type, None) @@ -390,7 +424,7 @@ async def dispatch_shell(self, msg): except Exception: self.log.debug("Unable to signal in pre_handler_hook:", exc_info=True) try: - result = handler(self.shell_stream, idents, msg) + result = handler(stream, idents, msg) if inspect.isawaitable(result): await result except Exception: @@ -406,7 +440,7 @@ async def dispatch_shell(self, msg): sys.stdout.flush() sys.stderr.flush() - self._publish_status_and_flush("idle", "shell", self.shell_stream) + self._publish_status_and_flush("idle", "shell", stream) def pre_handler_hook(self): """Hook to execute before calling message handler""" @@ -540,23 +574,83 @@ def start(self): """register dispatchers for streams""" self.io_loop = ioloop.IOLoop.current() self.msg_queue: Queue[t.Any] = Queue() - self.io_loop.add_callback(self.dispatch_queue) + if not self.shell_channel_thread: + self.io_loop.add_callback(self.dispatch_queue) if self.control_stream: self.control_stream.on_recv(self.dispatch_control, copy=False) if self.shell_stream: - self.shell_stream.on_recv( - partial( - self.schedule_dispatch, - self.dispatch_shell, - ), - copy=False, - ) + if self.shell_channel_thread: + self.shell_channel_thread.manager.set_on_recv_callback(self.shell_main) + self.shell_stream.on_recv(self.shell_channel_thread_main, copy=False) + else: + self.shell_stream.on_recv( + partial( + self.schedule_dispatch, + self.dispatch_shell, + ), + copy=False, + ) # publish idle status self._publish_status("starting", "shell") + async def shell_channel_thread_main(self, msg): + """Handler for shell messages received on shell_channel_thread""" + assert threading.current_thread() == self.shell_channel_thread + + if self.session is None: + return + + # deserialize only the header to get subshell_id + # Keep original message to send to subshell_id unmodified. + _, msg2 = self.session.feed_identities(msg, copy=False) + try: + msg3 = self.session.deserialize(msg2, content=False, copy=False) + subshell_id = msg3["header"].get("subshell_id") + + # Find inproc pair socket to use to send message to correct subshell. + subshell_manager = self.shell_channel_thread.manager + socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id) + assert socket is not None + socket.send_multipart(msg, copy=False) + except Exception: + self.log.error("Invalid message", exc_info=True) # noqa: G201 + + if self.shell_stream: + self.shell_stream.flush() + + async def shell_main(self, subshell_id: str | None, msg): + """Handler of shell messages for a single subshell""" + if self._supports_kernel_subshells: + if subshell_id is None: + assert threading.current_thread() == threading.main_thread() + else: + assert threading.current_thread() not in ( + self.shell_channel_thread, + threading.main_thread(), + ) + socket_pair = self.shell_channel_thread.manager.get_shell_channel_to_subshell_pair( + subshell_id + ) + else: + assert subshell_id is None + assert threading.current_thread() == threading.main_thread() + socket_pair = None + + try: + # Whilst executing a shell message, do not accept any other shell messages on the + # same subshell, so that cells are run sequentially. Without this we can run multiple + # async cells at the same time which would be a nice feature to have but is an API + # change. + if socket_pair: + socket_pair.pause_on_recv() + await self.dispatch_shell(msg, subshell_id=subshell_id) + finally: + if socket_pair: + socket_pair.resume_on_recv() + def record_ports(self, ports): """Record the ports that this kernel is using. @@ -596,7 +690,7 @@ def _publish_status(self, status, channel, parent=None): def _publish_status_and_flush(self, status, channel, stream, parent=None): """send status on IOPub and flush specified stream to ensure reply is sent before handling the next reply""" self._publish_status(status, channel, parent) - if stream: + if stream and hasattr(stream, "flush"): stream.flush(zmq.POLLOUT) def _publish_debug_event(self, event): @@ -741,6 +835,8 @@ async def execute_request(self, stream, ident, parent): if self._do_exec_accepted_params["cell_id"]: do_execute_args["cell_id"] = cell_id + subshell_id = parent["header"].get("subshell_id") + # Call do_execute with the appropriate arguments reply_content = self.do_execute(**do_execute_args) @@ -772,7 +868,8 @@ async def execute_request(self, stream, ident, parent): self.log.debug("%s", reply_msg) if not silent and reply_msg["content"]["status"] == "error" and stop_on_error: - self._abort_queues() + subshell_id = parent["header"].get("subshell_id") + self._abort_queues(subshell_id) def do_execute( self, @@ -877,14 +974,18 @@ async def connect_request(self, stream, ident, parent): @property def kernel_info(self): - return { + info = { "protocol_version": kernel_protocol_version, "implementation": self.implementation, "implementation_version": self.implementation_version, "language_info": self.language_info, "banner": self.banner, "help_links": self.help_links, + "supported_features": [], } + if self._supports_kernel_subshells: + info["supported_features"] = ["kernel subshells"] + return info async def kernel_info_request(self, stream, ident, parent): """Handle a kernel info request.""" @@ -971,7 +1072,8 @@ async def shutdown_request(self, stream, ident, parent): control_io_loop.add_callback(control_io_loop.stop) self.log.debug("Stopping shell ioloop") - if self.shell_stream: + self.io_loop.add_callback(self.io_loop.stop) + if self.shell_stream and self.shell_stream.io_loop != self.io_loop: shell_io_loop = self.shell_stream.io_loop shell_io_loop.add_callback(shell_io_loop.stop) @@ -1063,6 +1165,63 @@ async def usage_request(self, stream, ident, parent): async def do_debug_request(self, msg): raise NotImplementedError + async def create_subshell_request(self, socket, ident, parent) -> None: + if not self.session: + return + if not self._supports_kernel_subshells: + self.log.error("Subshells are not supported by this kernel") + return + + assert threading.current_thread().name == CONTROL_THREAD_NAME + + # This should only be called in the control thread if it exists. + # Request is passed to shell channel thread to process. + control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket + control_socket.send_json({"type": "create"}) + reply = control_socket.recv_json() + self.session.send(socket, "create_subshell_reply", reply, parent, ident) + + async def delete_subshell_request(self, socket, ident, parent) -> None: + if not self.session: + return + if not self._supports_kernel_subshells: + self.log.error("KERNEL SUBSHELLS NOT SUPPORTED") + return + + assert threading.current_thread().name == CONTROL_THREAD_NAME + + try: + content = parent["content"] + subshell_id = content["subshell_id"] + except Exception: + self.log.error("Got bad msg from parent: %s", parent) + return + + # This should only be called in the control thread if it exists. + # Request is passed to shell channel thread to process. + control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket + control_socket.send_json({"type": "delete", "subshell_id": subshell_id}) + reply = control_socket.recv_json() + + self.session.send(socket, "delete_subshell_reply", reply, parent, ident) + + async def list_subshell_request(self, socket, ident, parent) -> None: + if not self.session: + return + if not self._supports_kernel_subshells: + self.log.error("Subshells are not supported by this kernel") + return + + assert threading.current_thread().name == CONTROL_THREAD_NAME + + # This should only be called in the control thread if it exists. + # Request is passed to shell channel thread to process. + control_socket = self.shell_channel_thread.manager.control_to_shell_channel.from_socket + control_socket.send_json({"type": "list"}) + reply = control_socket.recv_json() + + self.session.send(socket, "list_subshell_reply", reply, parent, ident) + # --------------------------------------------------------------------------- # Engine methods (DEPRECATED) # --------------------------------------------------------------------------- @@ -1116,7 +1275,9 @@ async def abort_request(self, stream, ident, parent): # pragma: no cover if isinstance(msg_ids, str): msg_ids = [msg_ids] if not msg_ids: - self._abort_queues() + subshell_id = parent["header"].get("subshell_id") + self._abort_queues(subshell_id) + for mid in msg_ids: self.aborted.add(str(mid)) @@ -1153,12 +1314,34 @@ def _topic(self, topic): _aborting = Bool(False) - def _abort_queues(self): + def _post_dummy_stop_aborting_message(self, subshell_id: str | None) -> None: + """Post a dummy message to the correct subshell that when handled will unset + the _aborting flag. + """ + subshell_manager = self.shell_channel_thread.manager + socket = subshell_manager.get_shell_channel_to_subshell_socket(subshell_id) + assert socket is not None + + msg = b"stop aborting" # Magic string for dummy message. + socket.send(msg, copy=False) + + def _abort_queues(self, subshell_id: str | None = None): # while this flag is true, # execute requests will be aborted - self._aborting = True + + if subshell_id is None: + self._aborting = True + else: + self.shell_channel_thread.manager.set_subshell_aborting(subshell_id, True) self.log.info("Aborting queue") + if self.shell_channel_thread: + # Only really need to do this if there are messages already queued + self.shell_channel_thread.io_loop.add_callback( + self._post_dummy_stop_aborting_message, subshell_id + ) + return + # flush streams, so all currently waiting messages # are added to the queue if self.shell_stream: @@ -1387,3 +1570,7 @@ async def _at_shutdown(self): self.log.debug("%s", self._shutdown_message) if self.control_stream: self.control_stream.flush(zmq.POLLOUT) + + @property + def _supports_kernel_subshells(self): + return self.shell_channel_thread is not None diff --git a/ipykernel/shellchannel.py b/ipykernel/shellchannel.py new file mode 100644 index 000000000..77a02f11a --- /dev/null +++ b/ipykernel/shellchannel.py @@ -0,0 +1,47 @@ +"""A thread for a shell channel.""" +from __future__ import annotations + +from typing import Any + +import zmq + +from .subshell_manager import SubshellManager +from .thread import SHELL_CHANNEL_THREAD_NAME, BaseThread + + +class ShellChannelThread(BaseThread): + """A thread for a shell channel. + + Communicates with shell/subshell threads via pairs of ZMQ inproc sockets. + """ + + def __init__( + self, + context: zmq.Context[Any], + shell_socket: zmq.Socket[Any], + **kwargs, + ): + """Initialize the thread.""" + super().__init__(name=SHELL_CHANNEL_THREAD_NAME, **kwargs) + self._manager: SubshellManager | None = None + self._context = context + self._shell_socket = shell_socket + + @property + def manager(self) -> SubshellManager: + # Lazy initialisation. + if self._manager is None: + self._manager = SubshellManager( + self._context, + self.io_loop, + self._shell_socket, + ) + return self._manager + + def run(self) -> None: + """Run the thread.""" + try: + super().run() + finally: + if self._manager: + self._manager.close() diff --git a/ipykernel/socket_pair.py b/ipykernel/socket_pair.py new file mode 100644 index 000000000..cbdb2cc7b --- /dev/null +++ b/ipykernel/socket_pair.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from typing import Any + +import zmq +from tornado.ioloop import IOLoop +from zmq.eventloop.zmqstream import ZMQStream + + +class SocketPair: + """Pair of ZMQ inproc sockets for one-direction communication between 2 threads. + + One of the threads is always the shell_channel_thread, the other may be the control + thread, main thread or a subshell thread. + """ + + from_socket: zmq.Socket[Any] + to_socket: zmq.Socket[Any] + to_stream: ZMQStream | None = None + on_recv_callback: Any + on_recv_copy: bool + + def __init__(self, context: zmq.Context[Any], name: str): + self.from_socket = context.socket(zmq.PAIR) + self.to_socket = context.socket(zmq.PAIR) + address = self._address(name) + self.from_socket.bind(address) + self.to_socket.connect(address) # Or do I need to do this in another thread? + + def close(self): + self.from_socket.close() + + if self.to_stream is not None: + self.to_stream.close() + self.to_socket.close() + + def on_recv(self, io_loop: IOLoop, on_recv_callback, copy: bool = False): + # io_loop is that of the 'to' thread. + self.on_recv_callback = on_recv_callback + self.on_recv_copy = copy + if self.to_stream is None: + self.to_stream = ZMQStream(self.to_socket, io_loop) + self.resume_on_recv() + + def pause_on_recv(self): + if self.to_stream is not None: + self.to_stream.stop_on_recv() + + def resume_on_recv(self): + if self.to_stream is not None: + self.to_stream.on_recv(self.on_recv_callback, copy=self.on_recv_copy) + + def _address(self, name) -> str: + return f"inproc://subshell{name}" diff --git a/ipykernel/subshell.py b/ipykernel/subshell.py new file mode 100644 index 000000000..34c0778bd --- /dev/null +++ b/ipykernel/subshell.py @@ -0,0 +1,34 @@ +"""A thread for a subshell.""" + +from typing import Any + +import zmq + +from .socket_pair import SocketPair +from .thread import BaseThread + + +class SubshellThread(BaseThread): + """A thread for a subshell.""" + + def __init__( + self, + subshell_id: str, + context: zmq.Context[Any], + **kwargs, + ): + """Initialize the thread.""" + super().__init__(name=f"subshell-{subshell_id}", **kwargs) + + self.shell_channel_to_subshell = SocketPair(context, subshell_id) + self.subshell_to_shell_channel = SocketPair(context, subshell_id + "-reverse") + + # When aborting flag is set, execute_request messages to this subshell will be aborted. + self.aborting = False + + def run(self) -> None: + try: + super().run() + finally: + self.shell_channel_to_subshell.close() + self.subshell_to_shell_channel.close() diff --git a/ipykernel/subshell_manager.py b/ipykernel/subshell_manager.py new file mode 100644 index 000000000..be9dd758e --- /dev/null +++ b/ipykernel/subshell_manager.py @@ -0,0 +1,213 @@ +"""Manager of subshells in a kernel.""" +from __future__ import annotations + +import json +import typing as t +import uuid +from functools import partial +from threading import Lock, current_thread, main_thread + +import zmq +from tornado.ioloop import IOLoop + +from .socket_pair import SocketPair +from .subshell import SubshellThread +from .thread import SHELL_CHANNEL_THREAD_NAME + + +class SubshellManager: + """A manager of subshells. + + Controls the lifetimes of subshell threads and their associated ZMQ sockets and + streams. Runs mostly in the shell channel thread. + + Care needed with threadsafe access here. All write access to the cache occurs in + the shell channel thread so there is only ever one write access at any one time. + Reading of cache information can be performed by other threads, so all reads are + protected by a lock so that they are atomic. + + Sending reply messages via the shell_socket is wrapped by another lock to protect + against multiple subshells attempting to send at the same time. + """ + + def __init__( + self, + context: zmq.Context[t.Any], + shell_channel_io_loop: IOLoop, + shell_socket: zmq.Socket[t.Any], + ): + assert current_thread() == main_thread() + + self._context: zmq.Context[t.Any] = context + self._shell_channel_io_loop = shell_channel_io_loop + self._shell_socket = shell_socket + self._cache: dict[str, SubshellThread] = {} + self._lock_cache = Lock() + self._lock_shell_socket = Lock() + + # Inproc socket pair for communication from control thread to shell channel thread, + # such as for create_subshell_request messages. Reply messages are returned straight away. + self.control_to_shell_channel = SocketPair(self._context, "control") + self.control_to_shell_channel.on_recv( + self._shell_channel_io_loop, self._process_control_request, copy=True + ) + + # Inproc socket pair for communication from shell channel thread to main thread, + # such as for execute_request messages. + self._shell_channel_to_main = SocketPair(self._context, "main") + + # Inproc socket pair for communication from main thread to shell channel thread. + # such as for execute_reply messages. + self._main_to_shell_channel = SocketPair(self._context, "main-reverse") + self._main_to_shell_channel.on_recv( + self._shell_channel_io_loop, self._send_on_shell_channel + ) + + def close(self) -> None: + """Stop all subshells and close all resources.""" + assert current_thread().name == SHELL_CHANNEL_THREAD_NAME + with self._lock_cache: + while True: + try: + _, subshell_thread = self._cache.popitem() + except KeyError: + break + self._stop_subshell(subshell_thread) + + self.control_to_shell_channel.close() + self._main_to_shell_channel.close() + self._shell_channel_to_main.close() + + def get_shell_channel_to_subshell_pair(self, subshell_id: str | None) -> SocketPair: + if subshell_id is None: + return self._shell_channel_to_main + with self._lock_cache: + return self._cache[subshell_id].shell_channel_to_subshell + + def get_subshell_to_shell_channel_socket(self, subshell_id: str | None) -> zmq.Socket[t.Any]: + if subshell_id is None: + return self._main_to_shell_channel.from_socket + with self._lock_cache: + return self._cache[subshell_id].subshell_to_shell_channel.from_socket + + def get_shell_channel_to_subshell_socket(self, subshell_id: str | None) -> zmq.Socket[t.Any]: + return self.get_shell_channel_to_subshell_pair(subshell_id).from_socket + + def get_subshell_aborting(self, subshell_id: str) -> bool: + """Get the aborting flag of the specified subshell.""" + return self._cache[subshell_id].aborting + + def list_subshell(self) -> list[str]: + """Return list of current subshell ids. + + Can be called by any subshell using %subshell magic. + """ + with self._lock_cache: + return list(self._cache) + + def set_on_recv_callback(self, on_recv_callback): + assert current_thread() == main_thread() + self._on_recv_callback = on_recv_callback + self._shell_channel_to_main.on_recv(IOLoop.current(), partial(self._on_recv_callback, None)) + + def set_subshell_aborting(self, subshell_id: str, aborting: bool) -> None: + """Set the aborting flag of the specified subshell.""" + with self._lock_cache: + self._cache[subshell_id].aborting = aborting + + def subshell_id_from_thread_id(self, thread_id: int) -> str | None: + """Return subshell_id of the specified thread_id. + + Raises RuntimeError if thread_id is not the main shell or a subshell. + + Only used by %subshell magic so does not have to be fast/cached. + """ + with self._lock_cache: + if thread_id == main_thread().ident: + return None + for id, subshell in self._cache.items(): + if subshell.ident == thread_id: + return id + msg = f"Thread id {thread_id!r} does not correspond to a subshell of this kernel" + raise RuntimeError(msg) + + def _create_subshell(self) -> str: + """Create and start a new subshell thread.""" + assert current_thread().name == SHELL_CHANNEL_THREAD_NAME + + subshell_id = str(uuid.uuid4()) + subshell_thread = SubshellThread(subshell_id, self._context) + + with self._lock_cache: + assert subshell_id not in self._cache + self._cache[subshell_id] = subshell_thread + + subshell_thread.shell_channel_to_subshell.on_recv( + subshell_thread.io_loop, + partial(self._on_recv_callback, subshell_id), + ) + + subshell_thread.subshell_to_shell_channel.on_recv( + self._shell_channel_io_loop, self._send_on_shell_channel + ) + + subshell_thread.start() + return subshell_id + + def _delete_subshell(self, subshell_id: str) -> None: + """Delete subshell identified by subshell_id. + + Raises key error if subshell_id not in cache. + """ + assert current_thread().name == SHELL_CHANNEL_THREAD_NAME + + with self._lock_cache: + subshell_threwad = self._cache.pop(subshell_id) + + self._stop_subshell(subshell_threwad) + + def _process_control_request( + self, + request: list[t.Any], + ) -> None: + """Process a control request message received on the control inproc + socket and return the reply. Runs in the shell channel thread. + """ + assert current_thread().name == SHELL_CHANNEL_THREAD_NAME + + try: + decoded = json.loads(request[0]) + type = decoded["type"] + reply: dict[str, t.Any] = {"status": "ok"} + + if type == "create": + reply["subshell_id"] = self._create_subshell() + elif type == "delete": + subshell_id = decoded["subshell_id"] + self._delete_subshell(subshell_id) + elif type == "list": + reply["subshell_id"] = self.list_subshell() + else: + msg = f"Unrecognised message type {type!r}" + raise RuntimeError(msg) + except BaseException as err: + reply = { + "status": "error", + "evalue": str(err), + } + + # Return the reply to the control thread. + self.control_to_shell_channel.to_socket.send_json(reply) + + def _send_on_shell_channel(self, msg) -> None: + assert current_thread().name == SHELL_CHANNEL_THREAD_NAME + with self._lock_shell_socket: + self._shell_socket.send_multipart(msg) + + def _stop_subshell(self, subshell_thread: SubshellThread) -> None: + """Stop a subshell thread and close all of its resources.""" + assert current_thread().name == SHELL_CHANNEL_THREAD_NAME + + if subshell_thread.is_alive(): + subshell_thread.stop() + subshell_thread.join() diff --git a/ipykernel/thread.py b/ipykernel/thread.py new file mode 100644 index 000000000..13eb781b7 --- /dev/null +++ b/ipykernel/thread.py @@ -0,0 +1,32 @@ +"""Base class for threads.""" +from threading import Thread + +from tornado.ioloop import IOLoop + +CONTROL_THREAD_NAME = "Control" +SHELL_CHANNEL_THREAD_NAME = "Shell channel" + + +class BaseThread(Thread): + """Base class for threads.""" + + def __init__(self, **kwargs): + """Initialize the thread.""" + super().__init__(**kwargs) + self.io_loop = IOLoop(make_current=False) + self.pydev_do_not_trace = True + self.is_pydev_daemon_thread = True + + def run(self) -> None: + """Run the thread.""" + try: + self.io_loop.start() + finally: + self.io_loop.close() + + def stop(self) -> None: + """Stop the thread. + + This method is threadsafe. + """ + self.io_loop.add_callback(self.io_loop.stop) diff --git a/ipykernel/zmqshell.py b/ipykernel/zmqshell.py index 4fa850735..60682379d 100644 --- a/ipykernel/zmqshell.py +++ b/ipykernel/zmqshell.py @@ -16,9 +16,9 @@ import os import sys +import threading import warnings from pathlib import Path -from threading import local from IPython.core import page, payloadpage from IPython.core.autocall import ZMQExitAutocall @@ -69,7 +69,7 @@ def _flush_streams(self): @default("_thread_local") def _default_thread_local(self): """Initialize our thread local storage""" - return local() + return threading.local() @property def _hooks(self): @@ -439,6 +439,39 @@ def autosave(self, arg_s): else: print("Autosave disabled") + @line_magic + def subshell(self, arg_s): + """ + List all current subshells + """ + from ipykernel.kernelapp import IPKernelApp + + if not IPKernelApp.initialized(): + msg = "Not in a running Kernel" + raise RuntimeError(msg) + + app = IPKernelApp.instance() + kernel = app.kernel + + if not getattr(kernel, "_supports_kernel_subshells", False): + print("Kernel does not support subshells") + return + + thread_id = threading.current_thread().ident + manager = kernel.shell_channel_thread.manager + try: + subshell_id = manager.subshell_id_from_thread_id(thread_id) + except RuntimeError: + subshell_id = "unknown" + subshell_id_list = manager.list_subshell() + + print(f"subshell id: {subshell_id}") + print(f"thread id: {thread_id}") + print(f"main thread id: {threading.main_thread().ident}") + print(f"pid: {os.getpid()}") + print(f"thread count: {threading.active_count()}") + print(f"subshell list: {subshell_id_list}") + class ZMQInteractiveShell(InteractiveShell): """A subclass of InteractiveShell for ZMQ.""" diff --git a/pyproject.toml b/pyproject.toml index b11d572c9..39246b492 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,7 +44,9 @@ Tracker = "https://github.com/ipython/ipykernel/issues" [project.optional-dependencies] docs = [ - "sphinx", + # Sphinx pinned until `sphinx-autodoc-typehints` issue is resolved: + # https://github.com/tox-dev/sphinx-autodoc-typehints/issues/523 + "sphinx<8.2.0", "myst_parser", "pydata_sphinx_theme", "sphinxcontrib_github_alt", @@ -156,7 +158,8 @@ testpaths = [ "tests", ] asyncio_mode = "auto" -timeout = 300 +#timeout = 300 +timeout = 30 # Restore this setting to debug failures #timeout_method = "thread" filterwarnings= [ diff --git a/tests/test_connect.py b/tests/test_connect.py index b4b739d97..118d94dee 100644 --- a/tests/test_connect.py +++ b/tests/test_connect.py @@ -2,7 +2,6 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. - import errno import json import os diff --git a/tests/test_eventloop.py b/tests/test_eventloop.py index 69acc0ea2..76cadd7be 100644 --- a/tests/test_eventloop.py +++ b/tests/test_eventloop.py @@ -124,12 +124,16 @@ def test_cocoa_loop(kernel): loop_cocoa(kernel) -@pytest.mark.skipif( - len(qt_guis_avail) == 0, reason="No viable version of PyQt or PySide installed." -) -def test_qt_enable_gui(kernel, capsys): - gui = qt_guis_avail[0] - +@pytest.mark.parametrize("gui", qt_guis_avail) +def test_qt_enable_gui(gui, kernel, capsys): + if os.getenv("GITHUB_ACTIONS", None) == "true" and gui == "qt5": + pytest.skip("Qt5 and GitHub action crash CPython") + if gui == "qt6" and sys.version_info < (3, 10): + pytest.skip( + "qt6 fails on 3.9 with AttributeError: module 'PySide6.QtPrintSupport' has no attribute 'QApplication'" + ) + if sys.platform == "linux" and gui == "qt6" and os.getenv("GITHUB_ACTIONS", None) == "true": + pytest.skip("qt6 fails on github CI with missing libEGL.so.1") enable_gui(gui, kernel) # We store the `QApplication` instance in the kernel. diff --git a/tests/test_message_spec.py b/tests/test_message_spec.py index 7dd8ad1e2..0459b342b 100644 --- a/tests/test_message_spec.py +++ b/tests/test_message_spec.py @@ -238,6 +238,21 @@ class HistoryReply(Reply): history = List(List()) +# Subshell control messages + + +class CreateSubshellReply(Reply): + subshell_id = Unicode() + + +class DeleteSubshellReply(Reply): + pass + + +class ListSubshellReply(Reply): + subshell_id = List(Unicode()) + + references = { "execute_reply": ExecuteReply(), "inspect_reply": InspectReply(), @@ -254,6 +269,9 @@ class HistoryReply(Reply): "stream": Stream(), "display_data": DisplayData(), "header": RHeader(), + "create_subshell_reply": CreateSubshellReply(), + "delete_subshell_reply": DeleteSubshellReply(), + "list_subshell_reply": ListSubshellReply(), } # ----------------------------------------------------------------------------- @@ -365,7 +383,6 @@ def test_execute_stop_on_error(): KC.execute(code='print("Hello")') KC.execute(code='print("world")') reply = KC.get_shell_msg(timeout=TIMEOUT) - print(reply) reply = KC.get_shell_msg(timeout=TIMEOUT) assert reply["content"]["status"] == "aborted" # second message, too @@ -498,6 +515,8 @@ def test_kernel_info_request(): msg_id = KC.kernel_info() reply = get_reply(KC, msg_id, TIMEOUT) validate_message(reply, "kernel_info_reply", msg_id) + assert "supported_features" in reply["content"] + assert "kernel subshells" in reply["content"]["supported_features"] def test_connect_request(): @@ -509,6 +528,29 @@ def test_connect_request(): validate_message(reply, "connect_reply", msg_id) +def test_subshell(): + flush_channels() + + msg = KC.session.msg("create_subshell_request") + KC.control_channel.send(msg) + msg_id = msg["header"]["msg_id"] + reply = get_reply(KC, msg_id, TIMEOUT, channel="control") + validate_message(reply, "create_subshell_reply", msg_id) + subshell_id = reply["content"]["subshell_id"] + + msg = KC.session.msg("list_subshell_request") + KC.control_channel.send(msg) + msg_id = msg["header"]["msg_id"] + reply = get_reply(KC, msg_id, TIMEOUT, channel="control") + validate_message(reply, "list_subshell_reply", msg_id) + + msg = KC.session.msg("delete_subshell_request", {"subshell_id": subshell_id}) + KC.control_channel.send(msg) + msg_id = msg["header"]["msg_id"] + reply = get_reply(KC, msg_id, TIMEOUT, channel="control") + validate_message(reply, "delete_subshell_reply", msg_id) + + @pytest.mark.skipif( version_info < (5, 0), reason="earlier Jupyter Client don't have comm_info", diff --git a/tests/test_subshells.py b/tests/test_subshells.py new file mode 100644 index 000000000..d1affa666 --- /dev/null +++ b/tests/test_subshells.py @@ -0,0 +1,260 @@ +"""Test kernel subshells.""" + +# Copyright (c) IPython Development Team. +# Distributed under the terms of the Modified BSD License. +from __future__ import annotations + +import platform +import time + +import pytest +from jupyter_client.blocking.client import BlockingKernelClient + +from .utils import TIMEOUT, assemble_output, get_replies, get_reply, new_kernel + +# Helpers + + +def create_subshell_helper(kc: BlockingKernelClient): + msg = kc.session.msg("create_subshell_request") + kc.control_channel.send(msg) + msg_id = msg["header"]["msg_id"] + reply = get_reply(kc, msg_id, TIMEOUT, channel="control") + return reply["content"] + + +def delete_subshell_helper(kc: BlockingKernelClient, subshell_id: str): + msg = kc.session.msg("delete_subshell_request", {"subshell_id": subshell_id}) + kc.control_channel.send(msg) + msg_id = msg["header"]["msg_id"] + reply = get_reply(kc, msg_id, TIMEOUT, channel="control") + return reply["content"] + + +def list_subshell_helper(kc: BlockingKernelClient): + msg = kc.session.msg("list_subshell_request") + kc.control_channel.send(msg) + msg_id = msg["header"]["msg_id"] + reply = get_reply(kc, msg_id, TIMEOUT, channel="control") + return reply["content"] + + +def execute_request(kc: BlockingKernelClient, code: str, subshell_id: str | None): + msg = kc.session.msg("execute_request", {"code": code}) + msg["header"]["subshell_id"] = subshell_id + kc.shell_channel.send(msg) + return msg + + +def execute_request_subshell_id( + kc: BlockingKernelClient, code: str, subshell_id: str | None, terminator: str = "\n" +): + msg = execute_request(kc, code, subshell_id) + msg_id = msg["header"]["msg_id"] + stdout, _ = assemble_output(kc.get_iopub_msg, None, msg_id) + return stdout.strip() + + +def execute_thread_count(kc: BlockingKernelClient) -> int: + code = "print(threading.active_count())" + return int(execute_request_subshell_id(kc, code, None)) + + +def execute_thread_ids(kc: BlockingKernelClient, subshell_id: str | None = None) -> tuple[str, str]: + code = "print(threading.get_ident(), threading.main_thread().ident)" + return execute_request_subshell_id(kc, code, subshell_id).split() + + +# Tests + + +def test_no_subshells(): + with new_kernel() as kc: + # Test operation of separate channel thread without using any subshells. + execute_request_subshell_id(kc, "a = 2*3", None) + res = execute_request_subshell_id(kc, "print(a)", None) + assert res == "6" + + +def test_supported(): + with new_kernel() as kc: + msg_id = kc.kernel_info() + reply = get_reply(kc, msg_id, TIMEOUT) + assert "supported_features" in reply["content"] + assert "kernel subshells" in reply["content"]["supported_features"] + + +def test_subshell_id_lifetime(): + with new_kernel() as kc: + assert list_subshell_helper(kc)["subshell_id"] == [] + subshell_id = create_subshell_helper(kc)["subshell_id"] + assert list_subshell_helper(kc)["subshell_id"] == [subshell_id] + delete_subshell_helper(kc, subshell_id) + assert list_subshell_helper(kc)["subshell_id"] == [] + + +def test_thread_counts(): + with new_kernel() as kc: + execute_request_subshell_id(kc, "import threading", None) + nthreads = execute_thread_count(kc) + + subshell_id = create_subshell_helper(kc)["subshell_id"] + nthreads2 = execute_thread_count(kc) + assert nthreads2 > nthreads + + delete_subshell_helper(kc, subshell_id) + nthreads3 = execute_thread_count(kc) + assert nthreads3 == nthreads + + +def test_thread_ids(): + with new_kernel() as kc: + execute_request_subshell_id(kc, "import threading", None) + subshell_id = create_subshell_helper(kc)["subshell_id"] + + thread_id, main_thread_id = execute_thread_ids(kc) + assert thread_id == main_thread_id + + thread_id, main_thread_id = execute_thread_ids(kc, subshell_id) # This is the problem + assert thread_id != main_thread_id + + delete_subshell_helper(kc, subshell_id) + + +@pytest.mark.parametrize("are_subshells", [(False, True), (True, False), (True, True)]) +@pytest.mark.parametrize("overlap", [True, False]) +def test_run_concurrently_sequence(are_subshells, overlap, request): + if request.config.getvalue("--cov"): + pytest.skip("Skip time-sensitive subshell tests if measuring coverage") + + with new_kernel() as kc: + subshell_ids = [ + create_subshell_helper(kc)["subshell_id"] if is_subshell else None + for is_subshell in are_subshells + ] + + # Import time module before running time-sensitive subshell code + # and use threading.Barrier to synchronise start of subshell code. + execute_request_subshell_id( + kc, "import threading as t, time; b=t.Barrier(2); print('ok')", None + ) + + sleep = 0.5 + if overlap: + codes = [ + f"b.wait(); start0=True; end0=False; time.sleep({sleep}); end0=True", + f"b.wait(); time.sleep({sleep / 2}); assert start0; assert not end0; time.sleep({sleep}); assert end0", + ] + else: + codes = [ + f"b.wait(); start0=True; end0=False; time.sleep({sleep}); assert end1", + f"b.wait(); time.sleep({sleep / 2}); assert start0; assert not end0; end1=True", + ] + + msgs = [] + for subshell_id, code in zip(subshell_ids, codes): + msg = kc.session.msg("execute_request", {"code": code}) + msg["header"]["subshell_id"] = subshell_id + kc.shell_channel.send(msg) + msgs.append(msg) + + replies = get_replies(kc, [msg["msg_id"] for msg in msgs], timeout=None) + + for subshell_id in subshell_ids: + if subshell_id: + delete_subshell_helper(kc, subshell_id) + + for reply in replies: + assert reply["content"]["status"] == "ok", reply + + +def test_create_while_execute(): + with new_kernel() as kc: + # Send request to execute code on main subshell. + msg = kc.session.msg("execute_request", {"code": "import time; time.sleep(0.05)"}) + kc.shell_channel.send(msg) + + # Create subshell via control channel. + control_msg = kc.session.msg("create_subshell_request") + kc.control_channel.send(control_msg) + control_reply = get_reply(kc, control_msg["header"]["msg_id"], TIMEOUT, channel="control") + subshell_id = control_reply["content"]["subshell_id"] + control_date = control_reply["header"]["date"] + + # Get result message from main subshell. + shell_date = get_reply(kc, msg["msg_id"])["header"]["date"] + + delete_subshell_helper(kc, subshell_id) + + assert control_date < shell_date + + +@pytest.mark.skipif( + platform.python_implementation() == "PyPy", + reason="does not work on PyPy", +) +def test_shutdown_with_subshell(): + # Based on test_kernel.py::test_shutdown + with new_kernel() as kc: + km = kc.parent + subshell_id = create_subshell_helper(kc)["subshell_id"] + assert list_subshell_helper(kc)["subshell_id"] == [subshell_id] + kc.shutdown() + for _ in range(100): # 10 s timeout + if km.is_alive(): + time.sleep(0.1) + else: + break + assert not km.is_alive() + + +@pytest.mark.parametrize("are_subshells", [(False, True), (True, False), (True, True)]) +def test_execute_stop_on_error(are_subshells): + # Based on test_message_spec.py::test_execute_stop_on_error, testing that exception + # in one subshell aborts execution queue in that subshell but not others. + with new_kernel() as kc: + subshell_ids = [ + create_subshell_helper(kc)["subshell_id"] if is_subshell else None + for is_subshell in are_subshells + ] + + msg_ids = [] + + msg = execute_request( + kc, "import asyncio; await asyncio.sleep(1); raise ValueError()", subshell_ids[0] + ) + msg_ids.append(msg["msg_id"]) + msg = execute_request(kc, "print('hello')", subshell_ids[0]) + msg_ids.append(msg["msg_id"]) + msg = execute_request(kc, "print('goodbye')", subshell_ids[0]) + msg_ids.append(msg["msg_id"]) + + msg = execute_request(kc, "import time; time.sleep(1.5)", subshell_ids[1]) + msg_ids.append(msg["msg_id"]) + msg = execute_request(kc, "print('other')", subshell_ids[1]) + msg_ids.append(msg["msg_id"]) + + replies = get_replies(kc, msg_ids) + + assert replies[0]["parent_header"]["subshell_id"] == subshell_ids[0] + assert replies[1]["parent_header"]["subshell_id"] == subshell_ids[0] + assert replies[2]["parent_header"]["subshell_id"] == subshell_ids[0] + assert replies[3]["parent_header"]["subshell_id"] == subshell_ids[1] + assert replies[4]["parent_header"]["subshell_id"] == subshell_ids[1] + + assert replies[0]["content"]["status"] == "error" + assert replies[1]["content"]["status"] == "aborted" + assert replies[2]["content"]["status"] == "aborted" + assert replies[3]["content"]["status"] == "ok" + assert replies[4]["content"]["status"] == "ok" + + # Check abort is cleared. + msg = execute_request(kc, "print('check')", subshell_ids[0]) + reply = get_reply(kc, msg["msg_id"]) + assert reply["parent_header"]["subshell_id"] == subshell_ids[0] + assert reply["content"]["status"] == "ok" + + # Cleanup + for subshell_id in subshell_ids: + if subshell_id: + delete_subshell_helper(kc, subshell_id) diff --git a/tests/utils.py b/tests/utils.py index a0880df13..772163992 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -2,6 +2,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +from __future__ import annotations import atexit import os @@ -62,6 +63,24 @@ def get_reply(kc, msg_id, timeout=TIMEOUT, channel="shell"): return reply +def get_replies(kc, msg_ids: list[str], timeout=TIMEOUT, channel="shell"): + # Get replies which may arrive in any order as they may be running on different subshells. + # Replies are returned in the same order as the msg_ids, not in the order of arrival. + count = 0 + replies = [None] * len(msg_ids) + while count < len(msg_ids): + get_msg = getattr(kc, f"get_{channel}_msg") + reply = get_msg(timeout=timeout) + try: + msg_id = reply["parent_header"]["msg_id"] + replies[msg_ids.index(msg_id)] = reply + count += 1 + except ValueError: + # Allow debugging ignored replies + print(f"Ignoring reply not to any of {msg_ids}: {reply}") + return replies + + def execute(code="", kc=None, **kwargs): """wrapper for doing common steps for validating an execution request""" from .test_message_spec import validate_message @@ -149,14 +168,19 @@ def new_kernel(argv=None): return manager.run_kernel(**kwargs) -def assemble_output(get_msg): +def assemble_output(get_msg, timeout=1, parent_msg_id: str | None = None): """assemble stdout/err from an execution""" stdout = "" stderr = "" while True: - msg = get_msg(timeout=1) + msg = get_msg(timeout=timeout) msg_type = msg["msg_type"] content = msg["content"] + + if parent_msg_id is not None and msg["parent_header"]["msg_id"] != parent_msg_id: + # Ignore message for wrong parent message + continue + if msg_type == "status" and content["execution_state"] == "idle": # idle message signals end of output break @@ -173,12 +197,16 @@ def assemble_output(get_msg): return stdout, stderr -def wait_for_idle(kc): +def wait_for_idle(kc, parent_msg_id: str | None = None): while True: msg = kc.get_iopub_msg(timeout=1) msg_type = msg["msg_type"] content = msg["content"] - if msg_type == "status" and content["execution_state"] == "idle": + if ( + msg_type == "status" + and content["execution_state"] == "idle" + and (parent_msg_id is None or msg["parent_header"]["msg_id"] == parent_msg_id) + ): break