From a0e1f928c4fa83685684f3e21e37d597a77644db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Andersson?= Date: Mon, 14 Mar 2022 08:31:37 +0100 Subject: [PATCH] Add parameter for `on_error` to `BusABC.send_periodic` If there is an error while sending messages in a (background) periodic task, then the `on_error` callback is called, if set. If the callback is configured and returns `True` then the task continues, otherwise it is aborted. While it is possible to update a task with a callback after the task has been created, this procedure is prone to a race condition where the callback might not be configured in time for the first send event. Thus, if the first send fails and the callback has not yet been configured, the task will abort. This commit solves the race condition issue by adding an argument to `BusABC.send_periodic` to specify the callback. By including the callback in the constructor it will be deterministically active for all sends in the task. This fixes issue #1282. This commit also adds myself to the CONTRIBUTORS list. --- CONTRIBUTORS.txt | 3 ++- can/bus.py | 19 ++++++++++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index ae7792e42..7f7f79c70 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -81,4 +81,5 @@ Felix Nieuwenhuizen @fjburgos @pkess @felixn -@Tbruno25 \ No newline at end of file +@Tbruno25 +@andebjor diff --git a/can/bus.py b/can/bus.py index b9ccfcfad..9d9dccbb3 100644 --- a/can/bus.py +++ b/can/bus.py @@ -2,7 +2,7 @@ Contains the ABC bus implementation and its documentation. """ -from typing import cast, Any, Iterator, List, Optional, Sequence, Tuple, Union +from typing import cast, Any, Callable, Iterator, List, Optional, Sequence, Tuple, Union import can.typechecking @@ -181,6 +181,7 @@ def send_periodic( period: float, duration: Optional[float] = None, store_task: bool = True, + on_error: Optional[Callable[[Exception], bool]] = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Start sending messages at a given period on this bus. @@ -191,6 +192,9 @@ def send_periodic( - the Bus instance is shutdown - :meth:`BusABC.stop_all_periodic_tasks()` is called - the task's :meth:`CyclicTask.stop()` method is called. + - an error while sending and the (optional) `on_error` callback does + not return `True`. If the callback is not specified the task is + deactivated on error. :param msgs: Message(s) to transmit @@ -202,6 +206,10 @@ def send_periodic( :param store_task: If True (the default) the task will be attached to this Bus instance. Disable to instead manage tasks manually. + :param on_error: + Callable that accepts an exception if any error happened on a `bus` + while sending `msgs`, it shall return either ``True`` or ``False`` + depending on desired behaviour of `ThreadBasedCyclicSendTask`. :return: A started task instance. Note the task can be stopped (and depending on the backend modified) by calling the task's :meth:`stop` method. @@ -232,7 +240,7 @@ def send_periodic( # Create a backend specific task; will be patched to a _SelfRemovingCyclicTask later task = cast( _SelfRemovingCyclicTask, - self._send_periodic_internal(msgs, period, duration), + self._send_periodic_internal(msgs, period, duration, on_error), ) # we wrap the task's stop method to also remove it from the Bus's list of tasks @@ -260,6 +268,7 @@ def _send_periodic_internal( msgs: Union[Sequence[Message], Message], period: float, duration: Optional[float] = None, + on_error: Optional[Callable[[Exception], bool]] = None, ) -> can.broadcastmanager.CyclicSendTaskABC: """Default implementation of periodic message sending using threading. @@ -272,6 +281,10 @@ def _send_periodic_internal( :param duration: The duration between sending each message at the given rate. If no duration is provided, the task will continue indefinitely. + :param on_error: + Callable that accepts an exception if any error happened on a `bus` + while sending `msgs`, it shall return either ``True`` or ``False`` + depending on desired behaviour of `ThreadBasedCyclicSendTask`. :return: A started task instance. Note the task can be stopped (and depending on the backend modified) by calling the :meth:`stop` @@ -283,7 +296,7 @@ def _send_periodic_internal( threading.Lock() ) task = ThreadBasedCyclicSendTask( - self, self._lock_send_periodic, msgs, period, duration + self, self._lock_send_periodic, msgs, period, duration, on_error ) return task