Skip to content

Commit 2606895

Browse files
authored
Dispatch Managing Actor (#54)
- **Reset release notes** - **Add dispatch runner**
2 parents 2c4e16a + 039d22e commit 2606895

File tree

8 files changed

+844
-150
lines changed

8 files changed

+844
-150
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
## New Features
1212

13-
<!-- Here goes the main new features and examples or instructions on how to use them -->
13+
* We now provide the `DispatchManagingActor` class, a class to manage actors based on incoming dispatches.
1414

1515
## Bug Fixes
1616

pyproject.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ dependencies = [
3939
# Make sure to update the version for cross-referencing also in the
4040
# mkdocs.yml file when changing the version here (look for the config key
4141
# plugins.mkdocstrings.handlers.python.import)
42-
"frequenz-sdk >= 1.0.0-rc900, < 1.0.0-rc1000",
43-
"frequenz-channels >= 1.1.0, < 2.0.0",
44-
"frequenz-client-dispatch >= 0.6.0, < 0.7.0",
42+
"frequenz-sdk == 1.0.0-rc900, < 1.0.0-rc1000",
43+
"frequenz-channels >= 1.2.0, < 2.0.0",
44+
"frequenz-client-dispatch >= 0.7.0, < 0.8.0",
4545
]
4646
dynamic = ["version"]
4747

@@ -165,6 +165,7 @@ disable = [
165165
[tool.pytest.ini_options]
166166
testpaths = ["tests", "src"]
167167
asyncio_mode = "auto"
168+
asyncio_default_fixture_loop_scope = "function"
168169
required_plugins = ["pytest-asyncio", "pytest-mock"]
169170

170171
[tool.mypy]

src/frequenz/dispatch/__init__.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
88
* [Dispatcher][frequenz.dispatch.Dispatcher]: The entry point for the API.
99
* [Dispatch][frequenz.dispatch.Dispatch]: A dispatch type with lots of useful extra functionality.
10+
* [DispatchManagingActor][frequenz.dispatch.DispatchManagingActor]: An actor to
11+
manage other actors based on incoming dispatches.
1012
* [Created][frequenz.dispatch.Created],
1113
[Updated][frequenz.dispatch.Updated],
1214
[Deleted][frequenz.dispatch.Deleted]: Dispatch event types.
@@ -16,6 +18,7 @@
1618
from ._dispatch import Dispatch, RunningState
1719
from ._dispatcher import Dispatcher, ReceiverFetcher
1820
from ._event import Created, Deleted, DispatchEvent, Updated
21+
from ._managing_actor import DispatchManagingActor, DispatchUpdate
1922

2023
__all__ = [
2124
"Created",
@@ -26,4 +29,6 @@
2629
"Updated",
2730
"Dispatch",
2831
"RunningState",
32+
"DispatchManagingActor",
33+
"DispatchUpdate",
2934
]

src/frequenz/dispatch/_dispatch.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,13 @@ def running(self, type_: str) -> RunningState:
118118
return RunningState.STOPPED
119119

120120
now = datetime.now(tz=timezone.utc)
121+
122+
if now < self.start_time:
123+
return RunningState.STOPPED
124+
# A dispatch without duration is always running once it started
125+
if self.duration is None:
126+
return RunningState.RUNNING
127+
121128
if until := self._until(now):
122129
return RunningState.RUNNING if now < until else RunningState.STOPPED
123130

@@ -185,6 +192,7 @@ def next_run_after(self, after: datetime) -> datetime | None:
185192
if (
186193
not self.recurrence.frequency
187194
or self.recurrence.frequency == Frequency.UNSPECIFIED
195+
or self.duration is None # Infinite duration
188196
):
189197
if after > self.start_time:
190198
return None
@@ -236,7 +244,13 @@ def _until(self, now: datetime) -> datetime | None:
236244
237245
Returns:
238246
The time when the dispatch should end or None if the dispatch is not running.
247+
248+
Raises:
249+
ValueError: If the dispatch has no duration.
239250
"""
251+
if self.duration is None:
252+
raise ValueError("_until: Dispatch has no duration")
253+
240254
if (
241255
not self.recurrence.frequency
242256
or self.recurrence.frequency == Frequency.UNSPECIFIED
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
# License: All rights reserved
2+
# Copyright © 2024 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Helper class to manage actors based on dispatches."""
5+
6+
import logging
7+
from dataclasses import dataclass
8+
from typing import Any, Set
9+
10+
from frequenz.channels import Receiver, Sender
11+
from frequenz.client.dispatch.types import ComponentSelector
12+
from frequenz.sdk.actor import Actor
13+
14+
from ._dispatch import Dispatch, RunningState
15+
16+
_logger = logging.getLogger(__name__)
17+
18+
19+
@dataclass(frozen=True, kw_only=True)
20+
class DispatchUpdate:
21+
"""Event emitted when the dispatch changes."""
22+
23+
components: ComponentSelector
24+
"""Components to be used."""
25+
26+
dry_run: bool
27+
"""Whether this is a dry run."""
28+
29+
options: dict[str, Any]
30+
"""Additional options."""
31+
32+
33+
class DispatchManagingActor(Actor):
34+
"""Helper class to manage actors based on dispatches.
35+
36+
Example usage:
37+
38+
```python
39+
import os
40+
import asyncio
41+
from frequenz.dispatch import Dispatcher, DispatchManagingActor, DispatchUpdate
42+
from frequenz.client.dispatch.types import ComponentSelector
43+
from frequenz.client.common.microgrid.components import ComponentCategory
44+
45+
from frequenz.channels import Receiver, Broadcast
46+
47+
class MyActor(Actor):
48+
def __init__(self, updates_channel: Receiver[DispatchUpdate]):
49+
super().__init__()
50+
self._updates_channel = updates_channel
51+
self._dry_run: bool
52+
self._options : dict[str, Any]
53+
54+
async def _run(self) -> None:
55+
while True:
56+
update = await self._updates_channel.receive()
57+
print("Received update:", update)
58+
59+
self.set_components(update.components)
60+
self._dry_run = update.dry_run
61+
self._options = update.options
62+
63+
def set_components(self, components: ComponentSelector) -> None:
64+
match components:
65+
case [int(), *_] as component_ids:
66+
print("Dispatch: Setting components to %s", components)
67+
case [ComponentCategory.BATTERY, *_]:
68+
print("Dispatch: Using all battery components")
69+
case unsupported:
70+
print(
71+
"Dispatch: Requested an unsupported selector %r, "
72+
"but only component IDs or category BATTERY are supported.",
73+
unsupported,
74+
)
75+
76+
async def run():
77+
url = os.getenv("DISPATCH_API_URL", "grpc://fz-0004.frequenz.io:50051")
78+
key = os.getenv("DISPATCH_API_KEY", "some-key")
79+
80+
microgrid_id = 1
81+
82+
dispatcher = Dispatcher(
83+
microgrid_id=microgrid_id,
84+
server_url=url,
85+
key=key
86+
)
87+
88+
# Create update channel to receive dispatch update events pre-start and mid-run
89+
dispatch_updates_channel = Broadcast[DispatchUpdate](name="dispatch_updates_channel")
90+
91+
# Start actor and give it an dispatch updates channel receiver
92+
my_actor = MyActor(dispatch_updates_channel.new_receiver())
93+
94+
status_receiver = dispatcher.running_status_change.new_receiver()
95+
96+
managing_actor = DispatchManagingActor(
97+
actor=my_actor,
98+
dispatch_type="EXAMPLE",
99+
running_status_receiver=status_receiver,
100+
updates_sender=dispatch_updates_channel.new_sender(),
101+
)
102+
103+
await asyncio.gather(dispatcher.start(), managing_actor.start())
104+
```
105+
"""
106+
107+
def __init__(
108+
self,
109+
actor: Actor | Set[Actor],
110+
dispatch_type: str,
111+
running_status_receiver: Receiver[Dispatch],
112+
updates_sender: Sender[DispatchUpdate] | None = None,
113+
) -> None:
114+
"""Initialize the dispatch handler.
115+
116+
Args:
117+
actor: A set of actors or a single actor to manage.
118+
dispatch_type: The type of dispatches to handle.
119+
running_status_receiver: The receiver for dispatch running status changes.
120+
updates_sender: The sender for dispatch events
121+
"""
122+
super().__init__()
123+
self._dispatch_rx = running_status_receiver
124+
self._actors = frozenset([actor] if isinstance(actor, Actor) else actor)
125+
self._dispatch_type = dispatch_type
126+
self._updates_sender = updates_sender
127+
128+
def _start_actors(self) -> None:
129+
"""Start all actors."""
130+
for actor in self._actors:
131+
if actor.is_running:
132+
_logger.warning("Actor %s is already running", actor.name)
133+
else:
134+
actor.start()
135+
136+
async def _stop_actors(self, msg: str) -> None:
137+
"""Stop all actors.
138+
139+
Args:
140+
msg: The message to be passed to the actors being stopped.
141+
"""
142+
for actor in self._actors:
143+
if actor.is_running:
144+
await actor.stop(msg)
145+
else:
146+
_logger.warning("Actor %s is not running", actor.name)
147+
148+
async def _run(self) -> None:
149+
"""Wait for dispatches and handle them."""
150+
async for dispatch in self._dispatch_rx:
151+
await self._handle_dispatch(dispatch=dispatch)
152+
153+
async def _handle_dispatch(self, dispatch: Dispatch) -> None:
154+
"""Handle a dispatch.
155+
156+
Args:
157+
dispatch: The dispatch to handle.
158+
"""
159+
running = dispatch.running(self._dispatch_type)
160+
match running:
161+
case RunningState.STOPPED:
162+
_logger.info("Stopped by dispatch %s", dispatch.id)
163+
await self._stop_actors("Dispatch stopped")
164+
case RunningState.RUNNING:
165+
if self._updates_sender is not None:
166+
_logger.info("Updated by dispatch %s", dispatch.id)
167+
await self._updates_sender.send(
168+
DispatchUpdate(
169+
components=dispatch.selector,
170+
dry_run=dispatch.dry_run,
171+
options=dispatch.payload,
172+
)
173+
)
174+
175+
_logger.info("Started by dispatch %s", dispatch.id)
176+
self._start_actors()
177+
case RunningState.DIFFERENT_TYPE:
178+
_logger.debug(
179+
"Unknown dispatch! Ignoring dispatch of type %s", dispatch.type
180+
)

0 commit comments

Comments
 (0)