14
14
15
15
"""Module for the LoadComposableNodes action."""
16
16
17
+ import asyncio
17
18
from pathlib import Path
18
19
import threading
20
+ import os
19
21
20
22
from typing import List
21
23
from typing import Optional
22
24
from typing import Text
23
25
from typing import Union
24
26
27
+ import rclpy
25
28
import composition_interfaces .srv
26
29
27
30
from launch .action import Action
31
+ from launch .event import Event
32
+ from launch .event_handlers import OnShutdown
28
33
from launch .frontend import Entity
29
34
from launch .frontend import expose_action
30
35
from launch .frontend import Parser
31
36
from launch .launch_context import LaunchContext
32
37
import launch .logging
38
+ from launch .some_entities_type import SomeEntitiesType
33
39
from launch .some_substitutions_type import SomeSubstitutionsType
34
40
from launch .some_substitutions_type import SomeSubstitutionsType_types_tuple
35
41
from launch .utilities import ensure_argument_type
44
50
from .lifecycle_transition import LifecycleTransition
45
51
46
52
from ..descriptions import ComposableNode
47
- from ..ros_adapters import get_ros_node
53
+ from ..ros_adapters import get_ros_node , ROSAdapter
48
54
from ..utilities import add_node_name
49
55
from ..utilities import evaluate_parameters
50
56
from ..utilities import get_node_name_count
@@ -63,6 +69,7 @@ def __init__(
63
69
* ,
64
70
composable_node_descriptions : List [ComposableNode ],
65
71
target_container : Union [SomeSubstitutionsType , ComposableNodeContainer ],
72
+ unload_on_shutdown : Union [bool , SomeSubstitutionsType ] = False ,
66
73
** kwargs ,
67
74
) -> None :
68
75
"""
@@ -85,11 +92,21 @@ def __init__(
85
92
'target_container' ,
86
93
'LoadComposableNodes'
87
94
)
95
+ ensure_argument_type (
96
+ unload_on_shutdown ,
97
+ list (SomeSubstitutionsType_types_tuple ) +
98
+ [bool ],
99
+ 'unload_on_shutdown' ,
100
+ 'LoadComposableNodes'
101
+ )
88
102
super ().__init__ (** kwargs )
89
103
self .__composable_node_descriptions = composable_node_descriptions
90
104
self .__target_container = target_container
105
+ self .__unload_on_shutdown = unload_on_shutdown
91
106
self .__final_target_container_name : Optional [Text ] = None
92
107
self .__logger = launch .logging .get_logger (__name__ )
108
+ self .__loaded_nodes = {} # Track loaded nodes for unloading
109
+ self .__no_shutdown_future : Optional [asyncio .Future ] = None
93
110
94
111
@classmethod
95
112
def parse (cls , entity : Entity , parser : Parser ):
@@ -105,6 +122,9 @@ def parse(cls, entity: Entity, parser: Parser):
105
122
composable_node_cls , composable_node_kwargs = ComposableNode .parse (parser , entity )
106
123
kwargs ['composable_node_descriptions' ].append (
107
124
composable_node_cls (** composable_node_kwargs ))
125
+
126
+ kwargs ['unload_on_shutdown' ] = parser .parse_if_substitutions (
127
+ entity .get_attr ('unload_on_shutdown' , data_type = bool , optional = True ))
108
128
109
129
return cls , kwargs
110
130
@@ -162,6 +182,7 @@ def unblock(future):
162
182
163
183
node_name = response .full_node_name if response .full_node_name else request .node_name
164
184
if response .success :
185
+ self .__loaded_nodes [response .unique_id ] = node_name
165
186
if node_name is not None :
166
187
add_node_name (context , node_name )
167
188
node_name_count = get_node_name_count (context , node_name )
@@ -205,6 +226,144 @@ def _load_in_sequence(
205
226
)
206
227
)
207
228
229
+ def _unload_node (
230
+ self ,
231
+ request : composition_interfaces .srv .UnloadNode .Request ,
232
+ client : rclpy .client .Client
233
+ ) -> None :
234
+ """
235
+ Unload node synchronously.
236
+
237
+ :param request: service request to unload a node
238
+ :param client: service client to call the service
239
+ """
240
+ if not client .wait_for_service (timeout_sec = 5.0 ):
241
+ self .__logger .warning (
242
+ "Abandoning wait for the '{}' service, due to timeout." .format (
243
+ client .srv_name
244
+ )
245
+ )
246
+
247
+ # Asynchronously wait on service call so that we can periodically check for shutdown
248
+ event = threading .Event ()
249
+
250
+ def unblock (future ):
251
+ nonlocal event
252
+ event .set ()
253
+
254
+ self .__logger .debug (
255
+ "Calling the '{}' service with request '{}'" .format (
256
+ client .srv_name , request
257
+ )
258
+ )
259
+
260
+ response_future = client .call_async (request )
261
+ response_future .add_done_callback (unblock )
262
+
263
+ timeout = 10.0
264
+ while not event .wait (1.0 ):
265
+ timeout -= 1.0
266
+ if timeout <= 0.0 :
267
+ self .__logger .warning (
268
+ "Abandoning wait for the '{}' service response, due to timeout." .format (
269
+ client .srv_name ),
270
+ )
271
+ response_future .cancel ()
272
+ return
273
+
274
+ # Get response
275
+ if response_future .exception () is not None :
276
+ raise response_future .exception ()
277
+ response = response_future .result ()
278
+
279
+ self .__logger .debug ("Received response '{}'" .format (response ))
280
+
281
+ node_name = self .__loaded_nodes [request .unique_id ]
282
+ if response .success :
283
+ del self .__loaded_nodes [request .unique_id ]
284
+ self .__logger .info ("Unloaded node '{}' in container '{}'" .format (
285
+ node_name , self .__final_target_container_name
286
+ ))
287
+ else :
288
+ self .__logger .error (
289
+ "Failed to unload node '{}' in container '{}': {}" .format (
290
+ node_name , self .__final_target_container_name ,
291
+ response .error_message
292
+ )
293
+ )
294
+
295
+ def _unload_in_sequence (
296
+ self ,
297
+ unload_node_requests : List [composition_interfaces .srv .UnloadNode .Request ],
298
+ context : LaunchContext ,
299
+ ros_adapter : ROSAdapter = None ,
300
+ client : rclpy .client .Client = None
301
+ ) -> None :
302
+ """
303
+ Unload composable nodes sequentially.
304
+
305
+ :param unload_node_requests: a list of LoadNode service requests to execute
306
+ :param context: current launch context
307
+ :param ros_adapter: the ROS adapter to use for unloading
308
+ :param client: the service client to use for unloading
309
+ """
310
+ # If this is the first call, a ROS node has to be created.
311
+ # We need to create it ourselves, since the global ROS adapter also uses OnShutdown
312
+ # to terminate, which may happen before our handler is called.
313
+ if ros_adapter is None :
314
+ # Init an auxiliary ROS adapter solely for processing the unloading requests.
315
+ ros_adapter = ROSAdapter (autostart = False )
316
+ ros_adapter .start (custom_name = 'launch_ros_{}_unload_nodes_{}' .format (os .getpid (), id (self )))
317
+ # Create the service client.
318
+ client = ros_adapter .ros_node .create_client (
319
+ composition_interfaces .srv .UnloadNode , '{}/_container/unload_node' .format (
320
+ self .__final_target_container_name
321
+ )
322
+ )
323
+
324
+ # Process the first request and schedule the rest.
325
+ next_unload_node_request = unload_node_requests [0 ]
326
+ unload_node_requests = unload_node_requests [1 :]
327
+ self ._unload_node (next_unload_node_request , client )
328
+ if len (unload_node_requests ) > 0 :
329
+ context .add_completion_future (
330
+ context .asyncio_loop .run_in_executor (
331
+ None , self ._unload_in_sequence , unload_node_requests , context , ros_adapter , client
332
+ )
333
+ )
334
+ else :
335
+ # If this was the last request, stop the ROS node.
336
+ ros_adapter .shutdown ()
337
+
338
+ def __on_shutdown (self , event : Event , context : LaunchContext ) -> Optional [SomeEntitiesType ]:
339
+ """
340
+ Unload all nodes when the launch context is shutdown.
341
+ This handler is only registered if `unload_on_shutdown` is True.
342
+ """
343
+ # Generate unload requests
344
+ unload_node_requests = []
345
+ for unique_id , node_name in self .__loaded_nodes .items ():
346
+ request = composition_interfaces .srv .UnloadNode .Request ()
347
+ request .unique_id = unique_id
348
+ unload_node_requests .append (request )
349
+
350
+ # Process the unload requests
351
+ if unload_node_requests :
352
+ context .add_completion_future (
353
+ context .asyncio_loop .run_in_executor (
354
+ None , self ._unload_in_sequence , unload_node_requests , context
355
+ )
356
+ )
357
+
358
+ # Cancel the dummy coroutine so that the launch process can exit
359
+ # The exit will happen once the futures from _unload_in_sequence complete
360
+ if self .__no_shutdown_future is not None :
361
+ self .__no_shutdown_future .cancel ()
362
+
363
+ def get_asyncio_future (self ) -> Optional [asyncio .Future ]:
364
+ """Return the future used to track completion."""
365
+ return self .__no_shutdown_future
366
+
208
367
def execute (
209
368
self ,
210
369
context : LaunchContext
@@ -275,6 +434,22 @@ def execute(
275
434
if len (autostart_actions ) != 0 :
276
435
return autostart_actions
277
436
437
+ if self .__unload_on_shutdown :
438
+ # Register a shutdown event handler to unload the nodes when the launch context is shutdown.
439
+ context .register_event_handler (
440
+ OnShutdown (on_shutdown = self .__on_shutdown )
441
+ )
442
+
443
+ # Start dummy coroutine so that the launch process does not exit
444
+ async def dummy_coroutine ():
445
+ try :
446
+ while True :
447
+ await asyncio .sleep (1.0 )
448
+ except asyncio .CancelledError :
449
+ pass
450
+ self .__no_shutdown_future = context .asyncio_loop .create_task (
451
+ dummy_coroutine ()
452
+ )
278
453
279
454
def get_composable_node_load_request (
280
455
composable_node_description : ComposableNode ,
0 commit comments