Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check if Task(Future) is canceled. #1377

Open
wants to merge 1 commit into
base: rolling
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions rclpy/rclpy/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,14 +321,24 @@ def spin_until_future_complete(
future.add_done_callback(lambda x: self.wake())

if timeout_sec is None or timeout_sec < 0:
while self._context.ok() and not future.done() and not self._is_shutdown:
while (
self._context.ok() and
not future.done() and
not future.cancelled()
and not self._is_shutdown
):
self._spin_once_until_future_complete(future, timeout_sec)
else:
start = time.monotonic()
end = start + timeout_sec
timeout_left = TimeoutObject(timeout_sec)

while self._context.ok() and not future.done() and not self._is_shutdown:
while (
self._context.ok() and
not future.done() and
not future.cancelled()
and not self._is_shutdown
):
self._spin_once_until_future_complete(future, timeout_left)
now = time.monotonic()

Expand Down Expand Up @@ -610,6 +620,8 @@ def _wait_for_ready_callbacks(
with self._tasks_lock:
# Get rid of any tasks that are done
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].done(), self._tasks))
# Get rid of any tasks that are cancelled
self._tasks = list(filter(lambda t_e_n: not t_e_n[0].cancelled(), self._tasks))

# Gather entities that can be waited on
subscriptions: List[Subscription] = []
Expand Down
9 changes: 7 additions & 2 deletions rclpy/rclpy/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __del__(self) -> None:

def __await__(self) -> Generator[None, None, Optional[T]]:
# Yield if the task is not finished
while not self._done:
while not self._done and not self._cancelled:
Copy link

@nadavelkabets nadavelkabets Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asyncio tasks work a little differently, they have 3 possible states and are considered done if they are not pending:

# States for Future.
_PENDING = 'PENDING'
_CANCELLED = 'CANCELLED'
_FINISHED = 'FINISHED'

https://github.com/python/cpython/blob/ee0746af7d7cfc6cc25441726034e4fea4bcf7e5/Lib/asyncio/base_futures.py#L7-L10

  def done(self):
        """Return True if the future is done.

        Done means either that a result / exception are available, or that the
        future was cancelled.
        """
        return self._state != _PENDING

https://github.com/python/cpython/blob/403410fa1be036214efa7955127911e5592910db/Lib/asyncio/futures.py#L177-L183

@fujitatomoya what do you think?

yield
return self.result()

Expand Down Expand Up @@ -239,7 +239,12 @@ def __call__(self) -> None:

The return value of the handler is stored as the task result.
"""
if self._done or self._executing or not self._task_lock.acquire(blocking=False):
if (
self._done or
self._cancelled or
self._executing or
not self._task_lock.acquire(blocking=False)
):
return
try:
if self._done:
Expand Down
21 changes: 21 additions & 0 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,27 @@ async def coroutine():
self.assertTrue(future.done())
self.assertEqual('Sentinel Result', future.result())

def test_create_task_coroutine_cancel(self) -> None:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this change, all the other tests are passing but this additional one.

this test generates the following warning, eventually this test case fails... i spent some time to see what is going on here, but still not sure about this. requesting help for this.

colcon test --event-handlers console_direct+ --packages-select rclpy --ctest-args -R test_executor

PluggyTeardownRaisedWarning
13: ../../src/ros2/rclpy/rclpy/test/test_executor.py .......F............... [ 82%]
13: .....                                                                    [100%]
13:
13: =================================== FAILURES ===================================
13: ________________ TestExecutor.test_create_task_normal_function _________________
13:
13:     @pytest.hookimpl(hookwrapper=True, tryfirst=True)
13:     def pytest_runtest_call() -> Generator[None, None, None]:
13: >       yield from unraisable_exception_runtest_hook()
13:
13: /usr/lib/python3/dist-packages/_pytest/unraisableexception.py:88:
13: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
13:
13:     def unraisable_exception_runtest_hook() -> Generator[None, None, None]:
13:         with catch_unraisable_exception() as cm:
13:             yield
13:             if cm.unraisable:
13:                 if cm.unraisable.err_msg is not None:
13:                     err_msg = cm.unraisable.err_msg
13:                 else:
13:                     err_msg = "Exception ignored in"
13:                 msg = f"{err_msg}: {cm.unraisable.object!r}\n\n"
13:                 msg += "".join(
13:                     traceback.format_exception(
13:                         cm.unraisable.exc_type,
13:                         cm.unraisable.exc_value,
13:                         cm.unraisable.exc_traceback,
13:                     )
13:                 )
13: >               warnings.warn(pytest.PytestUnraisableExceptionWarning(msg))
13: E               pytest.PytestUnraisableExceptionWarning: Exception ignored in: <coroutine object TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine at 0x7f198fffa980>
13: E
13: E               Traceback (most recent call last):
13: E                 File "/usr/lib/python3.12/warnings.py", line 553, in _warn_unawaited_coroutine
13: E                   warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
13: E               RuntimeWarning: coroutine 'TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine' was never awaited
13:
13: /usr/lib/python3/dist-packages/_pytest/unraisableexception.py:78: PytestUnraisableExceptionWarning
13:
13: During handling of the above exception, another exception occurred:
13:
13: cls = <class '_pytest.runner.CallInfo'>
13: func = <function call_runtest_hook.<locals>.<lambda> at 0x7f198e051f80>
13: when = 'call'
13: reraise = (<class '_pytest.outcomes.Exit'>, <class 'KeyboardInterrupt'>)
13:
13:     @classmethod
13:     def from_call(
13:         cls,
13:         func: "Callable[[], TResult]",
13:         when: "Literal['collect', 'setup', 'call', 'teardown']",
13:         reraise: Optional[
13:             Union[Type[BaseException], Tuple[Type[BaseException], ...]]
13:         ] = None,
13:     ) -> "CallInfo[TResult]":
13:         """Call func, wrapping the result in a CallInfo.
13:
13:         :param func:
13:             The function to call. Called without arguments.
13:         :param when:
13:             The phase in which the function is called.
13:         :param reraise:
13:             Exception or exceptions that shall propagate if raised by the
13:             function, instead of being wrapped in the CallInfo.
13:         """
13:         excinfo = None
13:         start = timing.time()
13:         precise_start = timing.perf_counter()
13:         try:
13: >           result: Optional[TResult] = func()
13:
13: /usr/lib/python3/dist-packages/_pytest/runner.py:341:
13: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
13: /usr/lib/python3/dist-packages/_pytest/runner.py:262: in <lambda>
13:     lambda: ihook(item=item, **kwds), when=when, reraise=reraise
13: /usr/lib/python3/dist-packages/pluggy/_hooks.py:501: in __call__
13:     return self._hookexec(self.name, self._hookimpls.copy(), kwargs, firstresult)
13: /usr/lib/python3/dist-packages/pluggy/_manager.py:119: in _hookexec
13:     return self._inner_hookexec(hook_name, methods, kwargs, firstresult)
13: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
13:
13: hook_name = 'pytest_runtest_call'
13: hook_impl = <HookImpl plugin_name='unraisableexception', plugin=<module '_pytest.unraisableexception' from '/usr/lib/python3/dist-packages/_pytest/unraisableexception.py'>>
13: e = PytestUnraisableExceptionWarning('Exception ignored in: <coroutine object TestExecutor.test_create_task_coroutine_canc...\nRuntimeWarning: coroutine \'TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine\' was never awaited\n')
13:
13:     def _warn_teardown_exception(
13:         hook_name: str, hook_impl: HookImpl, e: BaseException
13:     ) -> None:
13:         msg = "A plugin raised an exception during an old-style hookwrapper teardown.\n"
13:         msg += f"Plugin: {hook_impl.plugin_name}, Hook: {hook_name}\n"
13:         msg += f"{type(e).__name__}: {e}\n"
13:         msg += "For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning"  # noqa: E501
13: >       warnings.warn(PluggyTeardownRaisedWarning(msg), stacklevel=5)
13: E       pluggy.PluggyTeardownRaisedWarning: A plugin raised an exception during an old-style hookwrapper teardown.
13: E       Plugin: unraisableexception, Hook: pytest_runtest_call
13: E       PytestUnraisableExceptionWarning: Exception ignored in: <coroutine object TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine at 0x7f198fffa980>
13: E
13: E       Traceback (most recent call last):
13: E         File "/usr/lib/python3.12/warnings.py", line 553, in _warn_unawaited_coroutine
13: E           warn(msg, category=RuntimeWarning, stacklevel=2, source=coro)
13: E       RuntimeWarning: coroutine 'TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine' was never awaited
13: E
13: E       For more information see https://pluggy.readthedocs.io/en/stable/api_reference.html#pluggy.PluggyTeardownRaisedWarning
13:
13: /usr/lib/python3/dist-packages/pluggy/_callers.py:49: PluggyTeardownRaisedWarning
13: - generated xml file: /root/ros2_ws/colcon_ws/build/rclpy/test_results/rclpy/test_executor.xunit.xml -
13: =========================== short test summary info ============================
13: FAILED ../../src/ros2/rclpy/rclpy/test/test_executor.py::TestExecutor::test_create_task_normal_function
13: ========================= 1 failed, 27 passed in 7.21s =========================
13: Exception ignored in: <function Executor.__del__ at 0x7f19977f7240>
13: Traceback (most recent call last):
13:   File "/root/ros2_ws/colcon_ws/src/ros2/rclpy/rclpy/rclpy/executors.py", line 262, in __del__
13:     self._sigint_gc.destroy()
13:   File "/root/ros2_ws/colcon_ws/src/ros2/rclpy/rclpy/rclpy/signals.py", line 70, in destroy
13:     with self.handle:
13: test_rclpy._rclpy_pybind11.InvalidHandle: cannot use Destroyable because destruction was requested
13: -- run_test.py: return code 1
13: -- run_test.py: verify result file '/root/ros2_ws/colcon_ws/build/rclpy/test_results/rclpy/test_executor.xunit.xml'
1/1 Test #13: test_executor ....................***Failed    7.82 sec

Copy link

@nadavelkabets nadavelkabets Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the test raised RuntimeWarning: coroutine 'TestExecutor.test_create_task_coroutine_cancel.<locals>.coroutine' was never awaited.

Previously, because of the bug, a cancelled task was not removed from the executor and was never garbage collected.
After your change, future.cancel() causes the executor to drop the task and the coroutine is never yielded. Some time later, the coroutine object is garbage collected, and since the coroutine is not closed, the exception is raised.

The error "coroutine was never awaited" is common and can be easily replicated:

async def foo():
	pass

coro = foo()
coro.__del__()

RuntimeWarning: coroutine 'foo' was never awaited

Calling coro.close() solves this issue by throwing GeneratorExit into the coroutine https://docs.python.org/3/reference/datamodel.html#coroutine.close

There are the possible solutions I see:

  1. Copying asyncio and cancelling the task by throwing a CancelledError exception into the coroutine.
    The main benefit of this approach is the possibility of user code inside the coroutine to catch the exception and perform cleanup.
    https://github.com/python/cpython/blob/061e50f196373d920c3eaa3718b9d0553914e006/Lib/asyncio/tasks.py#L270-L273
    https://github.com/python/cpython/blob/061e50f196373d920c3eaa3718b9d0553914e006/Lib/asyncio/tasks.py#L283-L307
def cancel(self):
	try:
		if not self._done and iscoroutine(self._handle):
			self._handle.throw(CancelledError())
	finally:
		super().cancel()
  1. Calling coro.close()
def cancel(self):
	if not self._done and iscoroutine(self._handle):
		  self._handle.close()
	super().cancel()
  1. Might work but feels like a patch to me, also skips cleanup and I don't really like that.
warnings.filterwarnings(
	'ignore',
	message=r'^coroutine .* was never awaited$',
	category=RuntimeWarning)

Also, I see that you called asyncio.sleep(1) in your test. It doesn't really matter in this case since this portion of the code is never executed, but await asyncio.sleep(1) is not possible in rclpy, since the implementation calls for asyncio.get_running_loop() but no asyncio loop is initialized or running (asyncio.sleep(0) is a little different as it just yields).
https://github.com/python/cpython/blob/061e50f196373d920c3eaa3718b9d0553914e006/Lib/asyncio/tasks.py#L688-L705

self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

async def coroutine():
await asyncio.sleep(1)
return 'Sentinel Result'

future = executor.create_task(coroutine)
self.assertFalse(future.done())
self.assertFalse(future.cancelled())

future.cancel()
self.assertTrue(future.cancelled())

executor.spin_until_future_complete(future)
self.assertFalse(future.done())
self.assertTrue(future.cancelled())
self.assertEqual(None, future.result())

def test_create_task_normal_function(self) -> None:
self.assertIsNotNone(self.node.handle)
executor = SingleThreadedExecutor(context=self.context)
Expand Down