Skip to content

Commit 0b6c82d

Browse files
committed
Add test package for single and multi thread
Signed-off-by: ical-Jeon <[email protected]>
1 parent 355adf0 commit 0b6c82d

7 files changed

+415
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
cmake_minimum_required(VERSION 3.5)
2+
project(simple_thread_tester)
3+
4+
find_package(ament_cmake REQUIRED)
5+
find_package(rclcpp REQUIRED)
6+
find_package(std_msgs REQUIRED)
7+
8+
set(CMAKE_CXX_STANDARD 17)
9+
set(CMAKE_BUILD_TYPE Debug)
10+
11+
add_executable(publisher_node publisher_node.cc)
12+
ament_target_dependencies(publisher_node rclcpp std_msgs)
13+
install(TARGETS publisher_node DESTINATION lib/simple_thread_tester)
14+
15+
add_executable(single_thread_subscriber_node single_thread_subscriber_node.cc)
16+
ament_target_dependencies(single_thread_subscriber_node rclcpp std_msgs)
17+
install(TARGETS single_thread_subscriber_node
18+
DESTINATION lib/simple_thread_tester)
19+
20+
add_executable(multi_thread_reentrant_subscriber_node
21+
multi_thread_reentrant_subscriber_node.cc)
22+
ament_target_dependencies(multi_thread_reentrant_subscriber_node rclcpp
23+
std_msgs)
24+
install(TARGETS multi_thread_reentrant_subscriber_node
25+
DESTINATION lib/simple_thread_tester)
26+
27+
add_executable(multi_thread_mutually_exclusive_subscriber_node
28+
multi_thread_mutually_exclusive_subscriber_node.cc)
29+
ament_target_dependencies(multi_thread_mutually_exclusive_subscriber_node
30+
rclcpp std_msgs)
31+
install(TARGETS multi_thread_mutually_exclusive_subscriber_node
32+
DESTINATION lib/simple_thread_tester)
33+
34+
ament_package()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
# Simple ros2 package to understand threads
2+
3+
Visit [here](https://motion-boseong.vercel.app/threading-ros2) for explanations.
4+
5+
## Running node
6+
7+
### 1. Single-thread test
8+
In terminal A (also for the two belows)
9+
10+
```
11+
ros2 run simple_thread_tester publisher_node
12+
```
13+
14+
In terminal B
15+
16+
```
17+
ros2 run simple_thread_tester single_thread_subscriber_node
18+
```
19+
### 2. Multi-thread reentrant callback group test
20+
21+
```
22+
ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node
23+
```
24+
25+
### 3. Multi-thread mutually exclusive callback group test
26+
27+
```
28+
ros2 run simple_thread_tester multi_thread_reentrant_subscriber_node
29+
# If simulate mutex
30+
ros2 param set /subscriber_node use_mutex true
31+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
#include <chrono>
2+
#include <random>
3+
#include <thread>
4+
5+
#include <rclcpp/rclcpp.hpp>
6+
#include <std_msgs/msg/string.hpp>
7+
8+
using namespace std::chrono_literals;
9+
10+
class MultiThreadMutuallyExclusiveSubscriber : public rclcpp::Node {
11+
12+
public:
13+
MultiThreadMutuallyExclusiveSubscriber(rclcpp::NodeOptions node_options)
14+
: Node("subscriber_node",
15+
node_options.allow_undeclared_parameters(true)) {
16+
rclcpp::SubscriptionOptions options;
17+
options.callback_group =
18+
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
19+
20+
short_subscriber_ = create_subscription<std_msgs::msg::String>(
21+
"/short_topic", rclcpp::QoS(10),
22+
std::bind(&MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback,
23+
this, std::placeholders::_1),
24+
options);
25+
26+
options.callback_group =
27+
create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
28+
long_subscriber_ = create_subscription<std_msgs::msg::String>(
29+
"/long_topic", rclcpp::QoS(10),
30+
std::bind(&MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback,
31+
this, std::placeholders::_1),
32+
options);
33+
34+
timer_ = create_wall_timer(
35+
1s, std::bind(&MultiThreadMutuallyExclusiveSubscriber::TimerCallback,
36+
this));
37+
38+
auto param_change_callback =
39+
[this](std::vector<rclcpp::Parameter> parameters) {
40+
for (auto parameter : parameters) {
41+
if (parameter.get_name() == "use_mutex") {
42+
use_mutex = parameter.as_bool();
43+
if (use_mutex)
44+
RCLCPP_INFO(get_logger(), "will use mutex");
45+
else
46+
RCLCPP_INFO(get_logger(), "will not mutex");
47+
}
48+
}
49+
50+
auto result = rcl_interfaces::msg::SetParametersResult();
51+
result.successful = true;
52+
return result;
53+
};
54+
55+
callback_handler =
56+
this->add_on_set_parameters_callback(param_change_callback);
57+
}
58+
59+
private:
60+
bool use_mutex{false};
61+
struct {
62+
std::mutex short_topic_mutex;
63+
std::mutex long_topic_mutex;
64+
} mutex_list;
65+
std::string processed_short_string_;
66+
std::string processed_long_string_;
67+
void ProcessString(const std::string &raw_string, std::string &output);
68+
69+
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr short_subscriber_;
70+
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr long_subscriber_;
71+
rclcpp::TimerBase::SharedPtr timer_;
72+
OnSetParametersCallbackHandle::SharedPtr callback_handler;
73+
74+
void ShortTopicCallback(const std_msgs::msg::String::SharedPtr msg);
75+
void LongTopicCallback(const std_msgs::msg::String::SharedPtr msg);
76+
void TimerCallback();
77+
};
78+
79+
void MultiThreadMutuallyExclusiveSubscriber::ProcessString(
80+
const std::string &raw_string, std::string &output) {
81+
output = "** ";
82+
83+
for (char c : raw_string) {
84+
output.push_back(c);
85+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
86+
}
87+
output += " **";
88+
};
89+
90+
void MultiThreadMutuallyExclusiveSubscriber::ShortTopicCallback(
91+
const std_msgs::msg::String::SharedPtr msg) {
92+
std::shared_ptr<std::unique_lock<std::mutex>> lock;
93+
if (use_mutex)
94+
mutex_list.short_topic_mutex.lock();
95+
ProcessString(msg->data, processed_short_string_);
96+
RCLCPP_INFO(get_logger(), "Setting processed: %s",
97+
processed_short_string_.c_str());
98+
if (use_mutex)
99+
mutex_list.short_topic_mutex.unlock();
100+
}
101+
102+
void MultiThreadMutuallyExclusiveSubscriber::LongTopicCallback(
103+
const std_msgs::msg::String::SharedPtr msg) {
104+
105+
if (use_mutex)
106+
mutex_list.long_topic_mutex.lock();
107+
ProcessString(msg->data, processed_long_string_);
108+
RCLCPP_INFO(get_logger(), "Setting processed: %s",
109+
processed_long_string_.c_str());
110+
if (use_mutex)
111+
mutex_list.long_topic_mutex.unlock();
112+
}
113+
114+
void MultiThreadMutuallyExclusiveSubscriber::TimerCallback() {
115+
if (use_mutex) {
116+
mutex_list.short_topic_mutex.lock();
117+
mutex_list.long_topic_mutex.lock();
118+
}
119+
RCLCPP_INFO(get_logger(),
120+
"Getting processed strings: \n [long] %s \n [short] %s",
121+
processed_long_string_.c_str(), processed_short_string_.c_str());
122+
123+
if (use_mutex) {
124+
mutex_list.short_topic_mutex.unlock();
125+
mutex_list.long_topic_mutex.unlock();
126+
}
127+
}
128+
129+
int main(int argc, char **argv) {
130+
rclcpp::init(argc, argv);
131+
rclcpp::NodeOptions option;
132+
auto node = std::make_shared<MultiThreadMutuallyExclusiveSubscriber>(option);
133+
rclcpp::executors::MultiThreadedExecutor executor;
134+
executor.add_node(node);
135+
executor.spin();
136+
rclcpp::shutdown();
137+
return 0;
138+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#include <chrono>
2+
#include <random>
3+
#include <thread>
4+
5+
#include <rclcpp/rclcpp.hpp>
6+
#include <std_msgs/msg/string.hpp>
7+
8+
using namespace std::chrono_literals;
9+
10+
class MultiThreadReentrantSubscriber : public rclcpp::Node {
11+
12+
public:
13+
MultiThreadReentrantSubscriber() : Node("subscriber_node") {
14+
rclcpp::SubscriptionOptions options;
15+
options.callback_group =
16+
create_callback_group(rclcpp::CallbackGroupType::Reentrant);
17+
18+
short_subscriber_ = create_subscription<std_msgs::msg::String>(
19+
"/short_topic", rclcpp::QoS(10),
20+
std::bind(&MultiThreadReentrantSubscriber::ShortTopicCallback, this,
21+
std::placeholders::_1),
22+
options);
23+
24+
long_subscriber_ = create_subscription<std_msgs::msg::String>(
25+
"/long_topic", rclcpp::QoS(10),
26+
std::bind(&MultiThreadReentrantSubscriber::LongTopicCallback, this,
27+
std::placeholders::_1),
28+
options);
29+
}
30+
31+
private:
32+
std::string processed_short_string_;
33+
std::string processed_long_string_;
34+
std::string ProcessString(const std::string &raw_string);
35+
36+
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr short_subscriber_;
37+
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr long_subscriber_;
38+
void ShortTopicCallback(const std_msgs::msg::String::SharedPtr msg);
39+
void LongTopicCallback(const std_msgs::msg::String::SharedPtr msg);
40+
};
41+
42+
std::string
43+
MultiThreadReentrantSubscriber::ProcessString(const std::string &raw_string) {
44+
std::thread::id this_id = std::this_thread::get_id();
45+
std::ostringstream oss;
46+
oss << std::this_thread::get_id();
47+
48+
std::this_thread::sleep_for(std::chrono::seconds(2));
49+
return "** " + raw_string + " **" + " ( by " + oss.str() + ") ";
50+
};
51+
52+
void MultiThreadReentrantSubscriber::ShortTopicCallback(
53+
const std_msgs::msg::String::SharedPtr msg) {
54+
auto processed_string = ProcessString(msg->data);
55+
56+
RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_string.c_str());
57+
processed_short_string_ = processed_string;
58+
}
59+
60+
void MultiThreadReentrantSubscriber::LongTopicCallback(
61+
const std_msgs::msg::String::SharedPtr msg) {
62+
auto processed_string = ProcessString(msg->data);
63+
64+
RCLCPP_INFO(get_logger(), "Setting processed: %s", processed_string.c_str());
65+
processed_long_string_ = processed_string;
66+
}
67+
68+
int main(int argc, char **argv) {
69+
rclcpp::init(argc, argv);
70+
auto node = std::make_shared<MultiThreadReentrantSubscriber>();
71+
rclcpp::executors::MultiThreadedExecutor executor;
72+
executor.add_node(node);
73+
executor.spin();
74+
rclcpp::shutdown();
75+
return 0;
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
<?xml version="1.0"?>
2+
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
3+
<package format="3">
4+
<name>simple_thread_tester</name>
5+
<version>0.0.0</version>
6+
<description>TODO: Package description</description>
7+
<maintainer email="[email protected]">jbs</maintainer>
8+
<license>TODO: License declaration</license>
9+
10+
<buildtool_depend>ament_cmake</buildtool_depend>
11+
12+
13+
<test_depend>ament_lint_auto</test_depend>
14+
<test_depend>ament_lint_common</test_depend>
15+
16+
<export>
17+
<build_type>ament_cmake</build_type>
18+
</export>
19+
</package>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
#include <random>
2+
3+
#include <rclcpp/rclcpp.hpp>
4+
#include <std_msgs/msg/string.hpp>
5+
6+
using namespace std::chrono_literals;
7+
8+
class PublisherNode : public rclcpp::Node {
9+
public:
10+
PublisherNode() : Node("publisher_node") {
11+
short_publisher_ = create_publisher<std_msgs::msg::String>(
12+
"/short_topic", rclcpp::SystemDefaultsQoS());
13+
long_publisher_ = create_publisher<std_msgs::msg::String>(
14+
"/long_topic", rclcpp::SystemDefaultsQoS());
15+
16+
short_timer_ =
17+
create_wall_timer(100ms, [this]() { PublishShortMessage(); });
18+
19+
long_timer_ = create_wall_timer(1s, [this]() { PublishLongMessage(); });
20+
}
21+
22+
private:
23+
size_t short_seq_{0};
24+
size_t long_seq_{0};
25+
26+
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr short_publisher_;
27+
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr long_publisher_;
28+
29+
rclcpp::TimerBase::SharedPtr short_timer_;
30+
rclcpp::TimerBase::SharedPtr long_timer_;
31+
32+
void PublishShortMessage() {
33+
std_msgs::msg::String message;
34+
message.data = "Short message (seq=" + std::to_string(short_seq_++) + ")";
35+
RCLCPP_INFO(get_logger(), message.data, " published.");
36+
short_publisher_->publish(message);
37+
}
38+
39+
void PublishLongMessage() {
40+
std_msgs::msg::String message;
41+
message.data = "Long message with more characters (seq=" +
42+
std::to_string(long_seq_++) + ")";
43+
RCLCPP_INFO(get_logger(), message.data, " published.");
44+
long_publisher_->publish(message);
45+
}
46+
};
47+
48+
int main(int argc, char **argv) {
49+
rclcpp::init(argc, argv);
50+
auto node = std::make_shared<PublisherNode>();
51+
rclcpp::spin(node);
52+
rclcpp::shutdown();
53+
return 0;
54+
}

0 commit comments

Comments
 (0)