Skip to content

Commit d156d8c

Browse files
kbrameldkbrameld@traclabs.com
authored andcommitted
apply changes from ros2#632
1 parent 9a44ebe commit d156d8c

File tree

3 files changed

+130
-16
lines changed

3 files changed

+130
-16
lines changed

launch/launch/actions/execute_local.py

Lines changed: 88 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
from osrf_pycommon.process_utils import async_execute_process # type: ignore
3737
from osrf_pycommon.process_utils import AsyncSubprocessProtocol
3838

39+
import psutil
40+
3941
from .emit_event import EmitEvent
4042
from .opaque_function import OpaqueFunction
4143
from .timer_action import TimerAction
@@ -64,9 +66,7 @@
6466
from ..launch_description_entity import LaunchDescriptionEntity
6567
from ..some_entities_type import SomeEntitiesType
6668
from ..some_substitutions_type import SomeSubstitutionsType
67-
from ..substitution import Substitution # noqa: F401
6869
from ..substitutions import LaunchConfiguration
69-
from ..substitutions import PythonExpression
7070
from ..utilities import is_a_subclass
7171
from ..utilities import normalize_to_list_of_substitutions
7272
from ..utilities import perform_substitutions
@@ -86,6 +86,8 @@ def __init__(
8686
'sigterm_timeout', default=5),
8787
sigkill_timeout: SomeSubstitutionsType = LaunchConfiguration(
8888
'sigkill_timeout', default=5),
89+
signal_lingering_subprocesses: SomeSubstitutionsType = LaunchConfiguration(
90+
'signal_lingering_subprocesses', default=True),
8991
emulate_tty: bool = False,
9092
output: SomeSubstitutionsType = 'log',
9193
output_format: Text = '[{this.process_description.final_name}] {line}',
@@ -158,6 +160,11 @@ def __init__(
158160
as a string or a list of strings and Substitutions to be resolved
159161
at runtime, defaults to the LaunchConfiguration called
160162
'sigkill_timeout'
163+
:param: signal_lingering_subprocesses if `True`, all subprocesses spawned by the process
164+
will be signaled to make sure they finish.
165+
The sequence of signals used is the same SIGINT/SIGTERM/SIGKILL sequence
166+
used to kill the main process.
167+
Subprocesses start being signaled when the main process completes.
161168
:param: emulate_tty emulate a tty (terminal), defaults to False, but can
162169
be overridden with the LaunchConfiguration called 'emulate_tty',
163170
the value of which is evaluated as true or false according to
@@ -190,6 +197,8 @@ def __init__(
190197
self.__shell = shell
191198
self.__sigterm_timeout = normalize_to_list_of_substitutions(sigterm_timeout)
192199
self.__sigkill_timeout = normalize_to_list_of_substitutions(sigkill_timeout)
200+
self.__signal_lingering_subprocesses = normalize_to_list_of_substitutions(
201+
signal_lingering_subprocesses)
193202
self.__emulate_tty = emulate_tty
194203
# Note: we need to use a temporary here so that we don't assign values with different types
195204
# to the same variable
@@ -219,6 +228,7 @@ def __init__(
219228
self.__shutdown_future = None # type: Optional[asyncio.Future]
220229
self.__sigterm_timer = None # type: Optional[TimerAction]
221230
self.__sigkill_timer = None # type: Optional[TimerAction]
231+
self.__children: List[psutil.Process] = []
222232
self.__stdout_buffer = io.StringIO()
223233
self.__stderr_buffer = io.StringIO()
224234

@@ -291,7 +301,11 @@ def _shutdown_process(self, context, *, send_sigint):
291301
self.__shutdown_future.set_result(None)
292302

293303
# Otherwise process is still running, start the shutdown procedures.
294-
context.extend_locals({'process_name': self.process_details['name']})
304+
context.extend_locals(
305+
{
306+
'process_name': self.process_details['name'],
307+
'process_pid': self.process_details['pid'],
308+
})
295309
actions_to_return = self.__get_shutdown_timer_actions()
296310
if send_sigint:
297311
actions_to_return.append(self.__get_sigint_event())
@@ -452,23 +466,17 @@ def __get_shutdown_timer_actions(self) -> List[Action]:
452466
base_msg = \
453467
"process[{}] failed to terminate '{}' seconds after receiving '{}', escalating to '{}'"
454468

455-
def printer(context, msg, timeout_substitutions):
456-
self.__logger.error(msg.format(
457-
context.locals.process_name,
458-
perform_substitutions(context, timeout_substitutions),
459-
))
469+
def printer(context, msg):
470+
self.__logger.error(msg.format(context.locals.process_name))
460471

461-
sigterm_timeout = self.__sigterm_timeout
462-
sigkill_timeout = [PythonExpression(
463-
('float(', *self.__sigterm_timeout, ') + float(', *self.__sigkill_timeout, ')')
464-
)]
465472
# Setup a timer to send us a SIGTERM if we don't shutdown quickly.
473+
sigterm_timeout = self.__sigterm_timeout_value
466474
self.__sigterm_timer = TimerAction(
467475
period=sigterm_timeout,
468476
actions=[
469477
OpaqueFunction(
470478
function=printer,
471-
args=(base_msg.format('{}', '{}', 'SIGINT', 'SIGTERM'), sigterm_timeout)
479+
args=(base_msg.format('{}', sigterm_timeout, 'SIGINT', 'SIGTERM'), )
472480
),
473481
EmitEvent(event=SignalProcess(
474482
signal_number=signal.SIGTERM,
@@ -477,13 +485,14 @@ def printer(context, msg, timeout_substitutions):
477485
],
478486
cancel_on_shutdown=False,
479487
)
488+
sigkill_timeout = self.__sigterm_timeout_value + self.__sigkill_timeout_value
480489
# Setup a timer to send us a SIGKILL if we don't shutdown after SIGTERM.
481490
self.__sigkill_timer = TimerAction(
482491
period=sigkill_timeout,
483492
actions=[
484493
OpaqueFunction(
485494
function=printer,
486-
args=(base_msg.format('{}', '{}', 'SIGTERM', 'SIGKILL'), sigkill_timeout)
495+
args=(base_msg.format('{}', sigkill_timeout, 'SIGTERM', 'SIGKILL'), )
487496
),
488497
EmitEvent(event=SignalProcess(
489498
signal_number='SIGKILL',
@@ -492,6 +501,13 @@ def printer(context, msg, timeout_substitutions):
492501
],
493502
cancel_on_shutdown=False,
494503
)
504+
self.__children = []
505+
pid = self._subprocess_transport.get_pid()
506+
if pid is not None:
507+
try:
508+
self.__children = psutil.Process(pid).children(recursive=True)
509+
except psutil.NoSuchProcess:
510+
pass
495511
return [
496512
cast(Action, self.__sigterm_timer),
497513
cast(Action, self.__sigkill_timer),
@@ -503,12 +519,15 @@ def __get_sigint_event(self):
503519
process_matcher=matches_action(self),
504520
))
505521

506-
def __cleanup(self):
507-
# Cancel any pending timers we started.
522+
def __cleanup_timers(self):
508523
if self.__sigterm_timer is not None:
509524
self.__sigterm_timer.cancel()
510525
if self.__sigkill_timer is not None:
511526
self.__sigkill_timer.cancel()
527+
528+
def __cleanup(self):
529+
# Cancel any pending timers we started.
530+
self.__cleanup_timers()
512531
# Close subprocess transport if any.
513532
if self._subprocess_transport is not None:
514533
self._subprocess_transport.close()
@@ -541,6 +560,48 @@ def on_stdout_received(self, data: bytes) -> None:
541560
def on_stderr_received(self, data: bytes) -> None:
542561
self.__context.emit_event_sync(ProcessStderr(text=data, **self.__process_event_args))
543562

563+
async def _signal_subprocesses(self, context):
564+
to_signal = self.__children
565+
signaled = []
566+
sig = signal.SIGINT
567+
start_time = context.asyncio_loop.time()
568+
sigterm_timeout = self.__sigterm_timeout_value
569+
sigkill_timeout = self.__sigterm_timeout_value + self.__sigkill_timeout_value
570+
process_pid = self.process_details['pid']
571+
process_name = self.process_details['name']
572+
log_prefix_format = (
573+
'subprocess[pid={}] of process['
574+
f'{process_name}, pid={process_pid}]: ')
575+
next_signals = iter(((signal.SIGTERM, sigterm_timeout), (signal.SIGKILL, sigkill_timeout)))
576+
while True:
577+
for p in to_signal:
578+
try:
579+
p.send_signal(sig)
580+
except psutil.NoSuchProcess:
581+
continue
582+
log_prefix = log_prefix_format.format(p.pid)
583+
self.__logger.info(
584+
f'{log_prefix}sending {sig.name} to subprocess directly.'
585+
)
586+
signaled.append(p)
587+
try:
588+
sig, timeout = next(next_signals)
589+
except StopIteration:
590+
return
591+
current_time = context.asyncio_loop.time()
592+
while current_time < start_time + timeout:
593+
await asyncio.sleep(min(0.5, start_time + timeout - current_time))
594+
for p in list(signaled):
595+
if not p.is_running():
596+
log_prefix = log_prefix_format.format(p.pid)
597+
self.__logger.info(f'{log_prefix}exited')
598+
signaled.remove(p)
599+
if not signaled:
600+
return
601+
current_time = context.asyncio_loop.time()
602+
to_signal = signaled
603+
signaled = []
604+
544605
async def __execute_process(self, context: LaunchContext) -> None:
545606
process_event_args = self.__process_event_args
546607
if process_event_args is None:
@@ -617,8 +678,13 @@ async def __execute_process(self, context: LaunchContext) -> None:
617678
timeout=self.__respawn_delay
618679
)
619680
if not self.__shutdown_future.done():
681+
if self.__signal_lingering_subprocesses_value:
682+
await self._signal_subprocesses(context)
620683
context.asyncio_loop.create_task(self.__execute_process(context))
621684
return
685+
self.__cleanup_timers()
686+
if self.__signal_lingering_subprocesses_value:
687+
await self._signal_subprocesses(context)
622688
self.__cleanup()
623689

624690
def prepare(self, context: LaunchContext):
@@ -701,6 +767,12 @@ def execute(self, context: LaunchContext) -> Optional[List[LaunchDescriptionEnti
701767
]
702768
for event_handler in event_handlers:
703769
context.register_event_handler(event_handler)
770+
self.__sigterm_timeout_value = perform_typed_substitution(
771+
context, self.__sigterm_timeout, float)
772+
self.__sigkill_timeout_value = perform_typed_substitution(
773+
context, self.__sigkill_timeout, float)
774+
self.__signal_lingering_subprocesses_value = perform_typed_substitution(
775+
context, self.__signal_lingering_subprocesses, bool)
704776

705777
try:
706778
self.__completed_future = context.asyncio_loop.create_future()

launch/package.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
<exec_depend>ament_index_python</exec_depend>
2626
<exec_depend>python3-importlib-metadata</exec_depend>
2727
<exec_depend>python3-lark-parser</exec_depend>
28+
<exec_depend>python3-psutil</exec_depend>
2829
<exec_depend>python3-yaml</exec_depend>
2930

3031
<test_depend>ament_copyright</test_depend>

launch/test/launch/test_execute_local.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717

1818
"""Tests for the ExecuteLocal Action."""
1919

20+
import asyncio
2021
import os
22+
import signal
2123
import sys
24+
import time
2225

2326
from launch import LaunchDescription
2427
from launch import LaunchService
@@ -28,6 +31,8 @@
2831
from launch.actions import TimerAction
2932
from launch.descriptions import Executable
3033

34+
import psutil
35+
3136
import pytest
3237

3338

@@ -178,3 +183,39 @@ def test_execute_process_with_output_dictionary():
178183
ls = LaunchService()
179184
ls.include_launch_description(ld)
180185
assert 0 == ls.run()
186+
187+
188+
PYTHON_SCRIPT = """\
189+
import time
190+
191+
while 1:
192+
time.sleep(0.5)
193+
"""
194+
195+
196+
def test_kill_subprocesses():
197+
"""Test launching a process with an environment variable."""
198+
executable = ExecuteLocal(
199+
process_description=Executable(
200+
cmd=['python3', '-c', f'"{PYTHON_SCRIPT}"'],
201+
),
202+
shell=True,
203+
output='screen',
204+
)
205+
ld = LaunchDescription([executable])
206+
ls = LaunchService()
207+
ls.include_launch_description(ld)
208+
loop = asyncio.new_event_loop()
209+
asyncio.set_event_loop(loop)
210+
run_async_task = loop.create_task(ls.run_async())
211+
212+
async def wait_for_subprocesses():
213+
start = time.time()
214+
while len(psutil.Process().children(recursive=True)) != 2:
215+
await asyncio.sleep(0.5)
216+
assert time.time() < start + 5., 'timed out waiting for processes to setup'
217+
wait_for_subprocesses_task = loop.create_task(wait_for_subprocesses())
218+
loop.run_until_complete(wait_for_subprocesses_task)
219+
os.kill(executable.process_details['pid'], signal.SIGTERM)
220+
loop.run_until_complete(run_async_task)
221+
assert len(psutil.Process().children(recursive=True)) == 0

0 commit comments

Comments
 (0)