Skip to content
13 changes: 10 additions & 3 deletions ros2launch/ros2launch/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import launch
from launch.frontend import Parser
from launch.launch_description_sources import get_launch_description_from_any_launch_file
from launch_ros.actions import SetRemap


class MultipleLaunchFilesError(Exception):
Expand Down Expand Up @@ -145,7 +146,8 @@ def launch_a_launch_file(
noninteractive=False,
args=None,
option_extensions={},
debug=False
debug=False,
remap_rules=None
):
"""Launch a given launch file (by path) and pass it the given launch file arguments."""
for name in sorted(option_extensions.keys()):
Expand All @@ -167,14 +169,19 @@ def launch_a_launch_file(
parsed_launch_arguments = parse_launch_arguments(launch_file_arguments)
# Include the user provided launch file using IncludeLaunchDescription so that the
# location of the current launch file is set.
launch_description = launch.LaunchDescription([
launch_description = launch.LaunchDescription()
if remap_rules is not None:
for remap_rule in remap_rules:
from_name, to_name = remap_rule.split(':=', maxsplit=1)
launch_description.add_action(SetRemap(src=from_name, dst=to_name))
launch_description.add_action(
launch.actions.IncludeLaunchDescription(
launch.launch_description_sources.AnyLaunchDescriptionSource(
launch_file_path
),
launch_arguments=parsed_launch_arguments,
),
])
)
for name in sorted(option_extensions.keys()):
result = option_extensions[name].prelaunch(
launch_description,
Expand Down
7 changes: 6 additions & 1 deletion ros2launch/ros2launch/command/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ def add_arguments(self, parser, cli_name):
help=('Regex pattern for filtering which executables the --launch-prefix is applied '
'to by matching the executable name.')
)
parser.add_argument(
'-r', '--remap', action='append', dest='remap_rules',
help=("Remapping rules, in the 'from:=to' form")
)
arg = parser.add_argument(
'package_name',
help='Name of the ROS package which contains the launch file')
Expand Down Expand Up @@ -175,5 +179,6 @@ def main(self, *, parser, args):
noninteractive=args.noninteractive,
args=args,
option_extensions=self._option_extensions,
debug=args.debug
debug=args.debug,
remap_rules=args.remap_rules
)
216 changes: 216 additions & 0 deletions ros2launch/test/test_cli_remap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#!/usr/bin/env python3
# Copyright 2025 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import os
import signal
import sys
import threading
import time
import unittest

from ament_index_python.packages import PackageNotFoundError
import rclpy
from rclpy.node import Node

from ros2launch.api.api import (
get_share_file_path_from_package,
launch_a_launch_file,
MultipleLaunchFilesError
)

# Configure basic logging
logging.basicConfig(level=logging.INFO, stream=sys.stderr,
format='%(asctime)s - %(levelname)s - [%(threadName)s] %(message)s')


class TestRemapArgument(unittest.TestCase):
"""Test the --remap command line argument for ros2 launch."""

def test_remap_argument(self):
"""Test that the --remap argument correctly remaps topics using direct API call."""
logging.info('Starting test_remap_argument...')

checker_thread = None
# Flag to indicate test success from the checker thread
test_successful = threading.Event()
test_failed_assertion = None

def check_topics_target():
nonlocal test_failed_assertion
logging.info('Checker thread started.')

# Initialize rclpy and create node for checking topics
logging.info('Checker thread: Initializing rclpy...')
# Ensure rclpy initializes in this thread context if necessary
# It might inherit context or need specific args depending on ROS setup
try:
rclpy.init()
except Exception as e:
logging.error(f'Checker thread: rclpy.init() failed: {e}', exc_info=True)
# Can't proceed without rclpy
os.kill(os.getpid(), signal.SIGINT) # Signal main thread to stop
return

node = None
try:
logging.info('Checker thread: Creating node __test_remap_checker...')
node = Node('__test_remap_checker', use_global_arguments=False)
logging.info('Checker thread: Node created.')

# Wait for the remapped topic to appear, poll for it
start_time = time.time()
timeout = 25.0
remapped_topic_found = False
iteration = 0

logging.info('Checker thread: Starting topic polling loop...')
while time.time() - start_time < timeout:
iteration += 1
# Check if main thread is still alive (optional, sanity check)

topic_names_and_types = node.get_topic_names_and_types()
current_topics = [name for name, types in topic_names_and_types]
logging.info(f'Poll {iteration}. Topic count: {len(current_topics)}')

if '/chatter_remapped' in current_topics:
logging.info("Checker thread: Found target topic '/chatter_remapped'!")
remapped_topic_found = True
break

time.sleep(0.5)

log_msg = f'Polling finished. Found remapped topic: {remapped_topic_found}'
logging.info(f'Checker thread: {log_msg}')

final_topic_names_and_types = node.get_topic_names_and_types()
final_topics = [name for name, types in final_topic_names_and_types]
logging.info(f'Checker thread: Final topics: {final_topics}')

# Perform assertions within this thread using self.assertX methods
try:
logging.info('Checker thread: Asserting /chatter_remapped is present...')
msg = (
f'Expected topic "/chatter_remapped" not found within {timeout}s. '
f'Final topics: {final_topics}'
)
self.assertTrue(remapped_topic_found, msg)
logging.info('Checker thread: Assertion passed.')

logging.info('Checker thread: Asserting /chatter is NOT present...')
msg = (
f'Unexpectedly found original topic "/chatter". '
f'Final topics: {final_topics}'
)
self.assertNotIn('/chatter', final_topics, msg)
logging.info('Checker thread: Assertion passed.')

# If assertions pass, set the success flag
test_successful.set()
except AssertionError as e:
logging.error(f'Checker thread: Assertion failed: {e}')
test_failed_assertion = e # Store assertion for main thread reporting
except Exception as e:
# Catch and print any other exceptions from the launch thread
logging.error(f'Checker thread: Error during checks: {e}', exc_info=True)
finally:
# Clean up node and rclpy
if node is not None:
logging.info('Checker thread: Destroying node...')
node.destroy_node()
if rclpy.ok():
logging.info('Checker thread: Shutting down rclpy...')
rclpy.shutdown()
logging.info('Checker thread: rclpy cleanup finished.')

# Signal the main thread (running launch) to stop
log_msg = 'Signaling main thread (SIGINT) to stop launch service...'
logging.info(f'Checker thread: {log_msg}')
os.kill(os.getpid(), signal.SIGINT)
logging.info('Checker thread: Exiting.')

# Main thread execution starts here
try:
logging.info('Main thread: Creating checker thread...')
# Make checker non-daemon so main thread waits for it via join()
checker_thread = threading.Thread(target=check_topics_target, daemon=False)
logging.info('Main thread: Starting checker thread...')
checker_thread.start()

launch_return_code = None
try:
# Find launch file (moved here from background thread)
logging.info('Main thread: Finding launch file...')
package = 'demo_nodes_cpp'
file = 'talker_listener_launch.py'
launch_file_path = get_share_file_path_from_package(
package_name=package, file_name=file
)
logging.info(f'Main thread: Found launch file: {launch_file_path}')

remap_rules = ['/chatter:=/chatter_remapped']

# Call launch_a_launch_file in the main thread
logging.info('Main thread: Calling launch_a_launch_file (blocking)...')
launch_return_code = launch_a_launch_file(
launch_file_path=launch_file_path,
launch_file_arguments=[],
noninteractive=True,
debug=False,
remap_rules=remap_rules
)
log_msg = f'launch_a_launch_file returned with code: {launch_return_code}'
logging.info(f'Main thread: {log_msg}')

except (PackageNotFoundError, FileNotFoundError, MultipleLaunchFilesError) as e:
logging.error(f'Main thread: Error finding launch file: {e}')
# Ensure checker thread is stopped if launch setup failed
if checker_thread and checker_thread.is_alive():
log_msg = 'Signaling checker thread to stop due to launch error...'
logging.info(f'Main thread: {log_msg}')
# Ideally have a cleaner way, but SIGINT might work if thread handles it
# Or rely on join timeout below
except Exception as e:
logging.error(f'Main thread: Error during launch: {e}', exc_info=True)
finally:
# Wait for the checker thread to finish its checks and signal
logging.info('Main thread: Joining checker thread...')
if checker_thread is not None:
checker_thread.join(timeout=30.0) # Increased timeout to allow checks
if checker_thread.is_alive():
logging.warning('Checker thread is still alive after timeout.')
else:
logging.info('Checker thread joined successfully.')

# Outer try block needs a finally or except
# Re-adding the final check logic here which belongs to the outer try
finally:
# After launch and checker thread have finished, check assertion results
logging.info('Main thread: Checking test results...')
if test_failed_assertion:
# Re-raise the assertion failure captured from the checker thread
raise test_failed_assertion
elif not test_successful.is_set():
# Fail if the checker thread didn't explicitly signal success
msg = 'Test failed: Checker thread did not signal success'
self.fail(msg)
else:
logging.info('Main thread: Test success confirmed by checker thread.')

logging.info('Main thread: Test finished.')


if __name__ == '__main__':
unittest.main()