diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 1b7bfb787..91e1059f1 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -16,4 +16,4 @@ ## Bug Fixes - +- Many long running async tasks including metric streamers in the BatteryPool now have automatic recovery in case of exceptions. diff --git a/src/frequenz/sdk/_internal/_asyncio.py b/src/frequenz/sdk/_internal/_asyncio.py index 1d82f239f..fc71e56f8 100644 --- a/src/frequenz/sdk/_internal/_asyncio.py +++ b/src/frequenz/sdk/_internal/_asyncio.py @@ -5,8 +5,12 @@ import asyncio +import logging from abc import ABC -from typing import Any +from datetime import timedelta +from typing import Any, Callable, Coroutine + +_logger = logging.getLogger(__name__) async def cancel_and_await(task: asyncio.Task[Any]) -> None: @@ -28,6 +32,25 @@ async def cancel_and_await(task: asyncio.Task[Any]) -> None: pass +async def run_forever( + async_callable: Callable[[], Coroutine[Any, Any, None]], + interval: timedelta = timedelta(seconds=1), +) -> None: + """Run a given function forever, restarting it after any exception. + + Args: + async_callable: The async callable to run. + interval: The interval between restarts. + """ + interval_s = interval.total_seconds() + while True: + try: + await async_callable() + except Exception: # pylint: disable=broad-except + _logger.exception("Restarting after exception") + await asyncio.sleep(interval_s) + + class NotSyncConstructible(AssertionError): """Raised when object with async constructor is created in sync way.""" diff --git a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py index 2b3701397..34486cfcc 100644 --- a/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py +++ b/src/frequenz/sdk/microgrid/_data_sourcing/microgrid_api_source.py @@ -18,6 +18,7 @@ MeterData, ) +from ..._internal._asyncio import run_forever from ..._internal._channels import ChannelRegistry from ...microgrid import connection_manager from ...timeseries import Sample @@ -460,7 +461,7 @@ async def _update_streams( self.comp_data_tasks[comp_id].cancel() self.comp_data_tasks[comp_id] = asyncio.create_task( - self._handle_data_stream(comp_id, category) + run_forever(lambda: self._handle_data_stream(comp_id, category)) ) async def add_metric(self, request: ComponentMetricRequest) -> None: diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py index 80817016d..62a33c37a 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_managers/_ev_charger_manager/_ev_charger_manager.py @@ -24,6 +24,7 @@ ) from typing_extensions import override +from ....._internal._asyncio import run_forever from ....._internal._math import is_close_to_zero from .....timeseries import Power, Sample3Phase, Voltage from .... import _data_pipeline, connection_manager @@ -89,7 +90,7 @@ async def start(self) -> None: """Start the ev charger data manager.""" # Need to start a task only if there are EV chargers in the component graph. if self._ev_charger_ids: - self._task = asyncio.create_task(self._run_forever()) + self._task = asyncio.create_task(run_forever(self._run)) @override async def distribute_power(self, request: Request) -> None: @@ -217,15 +218,6 @@ def _act_on_new_data(self, ev_data: EVChargerData) -> dict[int, Power]: ) return {component_id: target_power} - async def _run_forever(self) -> None: - """Run the EV charger manager forever.""" - while True: - try: - await self._run() - except Exception: # pylint: disable=broad-except - _logger.exception("Recovering from an error in EV charger manager.") - await asyncio.sleep(1.0) - async def _run(self) -> None: # pylint: disable=too-many-locals """Run the main event loop of the EV charger manager.""" api = connection_manager.get().api_client diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_ev_charger_status_tracker.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_ev_charger_status_tracker.py index 5b7e454c8..5285239dd 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_ev_charger_status_tracker.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_ev_charger_status_tracker.py @@ -17,6 +17,7 @@ ) from typing_extensions import override +from ...._internal._asyncio import run_forever from ....actor._background_service import BackgroundService from ... import connection_manager from ._blocking_status import BlockingStatus @@ -80,7 +81,7 @@ def __init__( # pylint: disable=too-many-arguments @override def start(self) -> None: """Start the status tracker.""" - self._tasks.add(asyncio.create_task(self._run_forever())) + self._tasks.add(asyncio.create_task(run_forever(self._run))) def _is_working(self, ev_data: EVChargerData) -> bool: """Return whether the given data indicates that the component is working.""" @@ -99,17 +100,6 @@ def _is_stale(self, ev_data: EVChargerData) -> bool: stale = now - ev_data.timestamp > self._max_data_age return stale - async def _run_forever(self) -> None: - """Run the status tracker forever.""" - while True: - try: - await self._run() - except Exception: # pylint: disable=broad-except - _logger.exception( - "Restarting after exception in EVChargerStatusTracker.run()" - ) - await asyncio.sleep(1.0) - def _handle_ev_data(self, ev_data: EVChargerData) -> ComponentStatusEnum: """Handle new EV charger data.""" if self._is_stale(ev_data): diff --git a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_pv_inverter_status_tracker.py b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_pv_inverter_status_tracker.py index 658926973..f3ad7ebe5 100644 --- a/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_pv_inverter_status_tracker.py +++ b/src/frequenz/sdk/microgrid/_power_distributing/_component_status/_pv_inverter_status_tracker.py @@ -12,6 +12,7 @@ from frequenz.client.microgrid import InverterComponentState, InverterData from typing_extensions import override +from ...._internal._asyncio import run_forever from ....actor._background_service import BackgroundService from ... import connection_manager from ._blocking_status import BlockingStatus @@ -76,7 +77,7 @@ def __init__( # pylint: disable=too-many-arguments @override def start(self) -> None: """Start the status tracker.""" - self._tasks.add(asyncio.create_task(self._run_forever())) + self._tasks.add(asyncio.create_task(run_forever(self._run))) def _is_working(self, pv_data: InverterData) -> bool: """Return whether the given data indicates that the PV inverter is working.""" @@ -87,16 +88,6 @@ def _is_working(self, pv_data: InverterData) -> bool: InverterComponentState.STANDBY, ) - async def _run_forever(self) -> None: - while True: - try: - await self._run() - except Exception: # pylint: disable=broad-except - _logger.exception( - "Restarting after exception in PVInverterStatusTracker.run()" - ) - await asyncio.sleep(1.0) - def _is_stale(self, pv_data: InverterData) -> bool: """Return whether the given data is stale.""" now = datetime.now(tz=timezone.utc) diff --git a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py index 8360632d1..ed7f01c45 100644 --- a/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py +++ b/src/frequenz/sdk/microgrid/_power_managing/_power_managing_actor.py @@ -15,6 +15,7 @@ from frequenz.client.microgrid import ComponentCategory, ComponentType, InverterType from typing_extensions import override +from ..._internal._asyncio import run_forever from ..._internal._channels import ChannelRegistry from ...actor import Actor from ...timeseries import Power @@ -194,7 +195,7 @@ def _add_system_bounds_tracker(self, component_ids: frozenset[int]) -> None: # Start the bounds tracker, for ongoing updates. self._bound_tracker_tasks[component_ids] = asyncio.create_task( - self._bounds_tracker(component_ids, bounds_receiver) + run_forever(lambda: self._bounds_tracker(component_ids, bounds_receiver)) ) def _calculate_shifted_bounds( diff --git a/src/frequenz/sdk/timeseries/battery_pool/_methods.py b/src/frequenz/sdk/timeseries/battery_pool/_methods.py index 8eea713d8..9ad44670b 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_methods.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_methods.py @@ -12,7 +12,7 @@ from frequenz.channels import Broadcast, Receiver -from ..._internal._asyncio import cancel_and_await +from ..._internal._asyncio import cancel_and_await, run_forever from ..._internal._constants import RECEIVER_MAX_SIZE, WAIT_FOR_COMPONENT_DATA_SEC from ...microgrid._power_distributing._component_managers._battery_manager import ( _get_battery_inverter_mappings, @@ -104,8 +104,10 @@ def __init__( self._update_event = asyncio.Event() self._cached_metrics: dict[int, ComponentMetricsData] = {} - self._update_task = asyncio.create_task(self._update_and_notify()) - self._send_task = asyncio.create_task(self._send_on_update(min_update_interval)) + self._update_task = asyncio.create_task(run_forever(self._update_and_notify)) + self._send_task = asyncio.create_task( + run_forever(lambda: self._send_on_update(min_update_interval)) + ) self._pending_data_fetchers: set[asyncio.Task[ComponentMetricsData | None]] = ( set() ) diff --git a/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py b/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py index a5ee0eb05..a4691173c 100644 --- a/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py +++ b/src/frequenz/sdk/timeseries/battery_pool/_metric_calculator.py @@ -397,9 +397,17 @@ def calculate( # # Therefore, the variables are named with a `_x100` suffix. usable_capacity_x100 = capacity * (soc_upper_bound - soc_lower_bound) - soc_scaled = ( - (soc - soc_lower_bound) / (soc_upper_bound - soc_lower_bound) * 100.0 - ) + if math.isclose(soc_upper_bound, soc_lower_bound): + if soc < soc_lower_bound: + soc_scaled = 0.0 + else: + soc_scaled = 100.0 + else: + soc_scaled = ( + (soc - soc_lower_bound) + / (soc_upper_bound - soc_lower_bound) + * 100.0 + ) # we are clamping here because the SoC might be out of bounds soc_scaled = min(max(soc_scaled, 0.0), 100.0) timestamp = max(timestamp, metrics.timestamp) diff --git a/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py b/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py index 12d5799bd..a4e7cb53c 100644 --- a/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py +++ b/src/frequenz/sdk/timeseries/ev_charger_pool/_system_bounds_tracker.py @@ -5,20 +5,18 @@ import asyncio -import logging from collections import abc from frequenz.channels import Receiver, Sender, merge, select, selected_from from frequenz.client.microgrid import EVChargerData +from ..._internal._asyncio import run_forever from ...actor import BackgroundService from ...microgrid import connection_manager from ...microgrid._power_distributing._component_status import ComponentPoolStatus from .. import Power from .._base_types import Bounds, SystemBounds -_logger = logging.getLogger(__name__) - class EVCSystemBoundsTracker(BackgroundService): """Track the system bounds for the EV chargers. @@ -55,7 +53,7 @@ def __init__( def start(self) -> None: """Start the EV charger system bounds tracker.""" - self._tasks.add(asyncio.create_task(self._run_forever())) + self._tasks.add(asyncio.create_task(run_forever(self._run))) async def _send_bounds(self) -> None: """Calculate and send the aggregate system bounds if they have changed.""" @@ -104,17 +102,6 @@ async def _send_bounds(self) -> None: ) await self._bounds_sender.send(self._last_sent_bounds) - async def _run_forever(self) -> None: - """Run the status tracker forever.""" - while True: - try: - await self._run() - except Exception: # pylint: disable=broad-except - _logger.exception( - "Restarting after exception in EVChargerSystemBoundsTracker.run()" - ) - await asyncio.sleep(1.0) - async def _run(self) -> None: """Run the system bounds tracker.""" api_client = connection_manager.get().api_client diff --git a/src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py b/src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py index 0c6f803ff..8a8a5c7f5 100644 --- a/src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py +++ b/src/frequenz/sdk/timeseries/pv_pool/_system_bounds_tracker.py @@ -4,20 +4,18 @@ """System bounds tracker for PV inverters.""" import asyncio -import logging from collections import abc from frequenz.channels import Receiver, Sender, merge, select, selected_from from frequenz.client.microgrid import InverterData +from ..._internal._asyncio import run_forever from ...actor import BackgroundService from ...microgrid import connection_manager from ...microgrid._power_distributing._component_status import ComponentPoolStatus from .._base_types import Bounds, SystemBounds from .._quantities import Power -_logger = logging.getLogger(__name__) - class PVSystemBoundsTracker(BackgroundService): """Track the system bounds for PV inverters. @@ -54,7 +52,7 @@ def __init__( def start(self) -> None: """Start the PV inverter system bounds tracker.""" - self._tasks.add(asyncio.create_task(self._run_forever())) + self._tasks.add(asyncio.create_task(run_forever(self._run))) async def _send_bounds(self) -> None: """Calculate and send the aggregate system bounds if they have changed.""" @@ -103,17 +101,6 @@ async def _send_bounds(self) -> None: ) await self._bounds_sender.send(self._last_sent_bounds) - async def _run_forever(self) -> None: - """Run the system bounds tracker.""" - while True: - try: - await self._run() - except Exception: # pylint: disable=broad-except - _logger.exception( - "Restarting after exception in PVSystemBoundsTracker.run()" - ) - await asyncio.sleep(1.0) - async def _run(self) -> None: """Run the system bounds tracker.""" api_client = connection_manager.get().api_client