Skip to content

Commit ea6737a

Browse files
authored
Support component nodes by changing CoContext to accept Node& instead of SharedPtr (#23)
1 parent 626f026 commit ea6737a

33 files changed

Lines changed: 320 additions & 87 deletions

README.md

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,14 @@ colcon build --packages-select rclcpp_async
5151
rclcpp_async::Task<void> run(rclcpp_async::CoContext & ctx)
5252
{
5353
co_await ctx.sleep(std::chrono::seconds(1));
54-
RCLCPP_INFO(ctx.node()->get_logger(), "Hello from coroutine!");
54+
RCLCPP_INFO(ctx.node().get_logger(), "Hello from coroutine!");
5555
}
5656

5757
int main(int argc, char * argv[])
5858
{
5959
rclcpp::init(argc, argv);
6060
auto node = std::make_shared<rclcpp::Node>("hello");
61-
rclcpp_async::CoContext ctx(node);
61+
rclcpp_async::CoContext ctx(*node);
6262

6363
auto task = ctx.create_task(run(ctx));
6464

@@ -110,7 +110,7 @@ Task<void> listen(CoContext & ctx, std::string topic)
110110
if (!msg.has_value()) {
111111
break;
112112
}
113-
RCLCPP_INFO(ctx.node()->get_logger(), "Heard: %s", (*msg)->data.c_str());
113+
RCLCPP_INFO(ctx.node().get_logger(), "Heard: %s", (*msg)->data.c_str());
114114
}
115115
}
116116
```
@@ -133,12 +133,12 @@ using SetBool = std_srvs::srv::SetBool;
133133

134134
Task<void> call_service(CoContext & ctx)
135135
{
136-
auto client = ctx.node()->create_client<SetBool>("set_bool");
136+
auto client = ctx.node().create_client<SetBool>("set_bool");
137137

138138
// Wait for the service to become available
139139
auto wait_result = co_await ctx.wait_for_service(client, std::chrono::seconds(10));
140140
if (!wait_result.ok()) {
141-
RCLCPP_ERROR(ctx.node()->get_logger(), "Service not available");
141+
RCLCPP_ERROR(ctx.node().get_logger(), "Service not available");
142142
co_return;
143143
}
144144

@@ -147,7 +147,7 @@ Task<void> call_service(CoContext & ctx)
147147
req->data = true;
148148

149149
auto resp = co_await ctx.send_request<SetBool>(client, req);
150-
RCLCPP_INFO(ctx.node()->get_logger(), "Response: %s", resp->message.c_str());
150+
RCLCPP_INFO(ctx.node().get_logger(), "Response: %s", resp->message.c_str());
151151
}
152152
```
153153
@@ -163,7 +163,7 @@ Task<void> tick(CoContext & ctx)
163163
164164
while (true) {
165165
co_await timer->next();
166-
RCLCPP_INFO(ctx.node()->get_logger(), "Tick %d", count++);
166+
RCLCPP_INFO(ctx.node().get_logger(), "Tick %d", count++);
167167
}
168168
}
169169
```
@@ -273,7 +273,7 @@ Send a goal and iterate over feedback with `GoalStream`:
273273
```cpp
274274
Task<void> send_action(CoContext & ctx)
275275
{
276-
auto client = rclcpp_action::create_client<Fibonacci>(ctx.node(), "fibonacci");
276+
auto client = rclcpp_action::create_client<Fibonacci>(&ctx.node(), "fibonacci");
277277

278278
auto wait_result = co_await ctx.wait_for_action(client, std::chrono::seconds(10));
279279
if (!wait_result.ok()) { co_return; }
@@ -293,13 +293,13 @@ Task<void> send_action(CoContext & ctx)
293293
break;
294294
}
295295
auto & seq = (*feedback)->sequence;
296-
RCLCPP_INFO(ctx.node()->get_logger(), "Feedback: last=%d", seq.back());
296+
RCLCPP_INFO(ctx.node().get_logger(), "Feedback: last=%d", seq.back());
297297
}
298298

299299
// Get final result
300300
auto result = stream->result();
301301
auto & seq = result.result->sequence;
302-
RCLCPP_INFO(ctx.node()->get_logger(), "Result: last=%d", seq.back());
302+
RCLCPP_INFO(ctx.node().get_logger(), "Result: last=%d", seq.back());
303303
}
304304
```
305305
@@ -313,7 +313,7 @@ rclcpp_async::Event event(ctx);
313313
// Task 1: wait for the event
314314
auto waiter = ctx.create_task([&]() -> Task<void> {
315315
co_await event.wait();
316-
RCLCPP_INFO(ctx.node()->get_logger(), "Event received!");
316+
RCLCPP_INFO(ctx.node().get_logger(), "Event received!");
317317
});
318318
319319
// Task 2: signal the event
@@ -340,7 +340,7 @@ rclcpp_async::Mutex mutex(ctx);
340340
Task<void> critical_section(CoContext & ctx, Mutex & mutex, const std::string & name)
341341
{
342342
co_await mutex.lock();
343-
RCLCPP_INFO(ctx.node()->get_logger(), "%s: acquired lock", name.c_str());
343+
RCLCPP_INFO(ctx.node().get_logger(), "%s: acquired lock", name.c_str());
344344
co_await ctx.sleep(std::chrono::seconds(1));
345345
mutex.unlock();
346346
}
@@ -364,7 +364,7 @@ Task<void> run(CoContext & ctx)
364364
// Sync lookup (latest) -- returns nullopt if not yet available
365365
auto opt = tf.lookup_transform("map", "base_link");
366366
if (opt) {
367-
RCLCPP_INFO(ctx.node()->get_logger(), "x=%.2f", opt->transform.translation.x);
367+
RCLCPP_INFO(ctx.node().get_logger(), "x=%.2f", opt->transform.translation.x);
368368
}
369369
370370
// Async lookup -- suspends until the transform becomes available
@@ -403,7 +403,7 @@ auto task = ctx.create_task([&]() -> Task<void> {
403403
if (!val.has_value()) {
404404
break; // Channel closed
405405
}
406-
RCLCPP_INFO(ctx.node()->get_logger(), "Received: %d", *val);
406+
RCLCPP_INFO(ctx.node().get_logger(), "Received: %d", *val);
407407
}
408408
});
409409
```
@@ -483,14 +483,14 @@ Task<void> run(CoContext & ctx)
483483
```cpp
484484
Task<void> run(CoContext & ctx)
485485
{
486-
auto client = ctx.node()->create_client<SetBool>("set_bool");
486+
auto client = ctx.node().create_client<SetBool>("set_bool");
487487
488488
// Race an awaitable against a timeout
489489
auto req = std::make_shared<SetBool::Request>();
490490
req->data = true;
491491
auto resp = co_await ctx.wait_for(ctx.send_request<SetBool>(client, req), 5s);
492492
if (resp.ok()) {
493-
RCLCPP_INFO(ctx.node()->get_logger(), "Response: %s", resp.value->message.c_str());
493+
RCLCPP_INFO(ctx.node().get_logger(), "Response: %s", resp.value->message.c_str());
494494
}
495495
}
496496
```
@@ -546,7 +546,7 @@ Task<void> run(CoContext & ctx)
546546
try {
547547
co_await ctx.sleep(std::chrono::seconds(10));
548548
} catch (const CancelledException &) {
549-
RCLCPP_INFO(ctx.node()->get_logger(), "Cancelled!");
549+
RCLCPP_INFO(ctx.node().get_logger(), "Cancelled!");
550550
}
551551
}
552552
```
@@ -579,7 +579,7 @@ See [`nested_demo.cpp`](rclcpp_async_example/src/nested_demo.cpp) for a full dem
579579
| `when_all(awaitables...)` | *awaitable* `tuple<Ts...>` | Await all tasks/awaitables concurrently |
580580
| `when_any(awaitables...)` | *awaitable* `variant<Ts...>` | Race tasks/awaitables, return first result |
581581
| `post(fn)` | `void` | Post a callback to the executor thread (thread-safe) |
582-
| `node()` | `Node::SharedPtr` | Access the underlying node |
582+
| `node()` | `Node&` | Access the underlying node |
583583
584584
### TfBuffer
585585

rclcpp_async/include/rclcpp_async/co_context.hpp

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -116,22 +116,25 @@ class DrainWaitable : public rclcpp::Waitable
116116
class CoContext
117117
{
118118
std::shared_ptr<DrainWaitable> drain_;
119-
rclcpp::Node::SharedPtr node_;
119+
rclcpp::Node & node_;
120120

121121
public:
122-
explicit CoContext(rclcpp::Node::SharedPtr node) : node_(std::move(node))
122+
explicit CoContext(rclcpp::Node & node) : node_(node)
123123
{
124124
drain_ = std::make_shared<DrainWaitable>();
125-
node_->get_node_waitables_interface()->add_waitable(drain_, nullptr);
125+
node_.get_node_waitables_interface()->add_waitable(drain_, nullptr);
126126
}
127127

128+
CoContext(const CoContext &) = delete;
129+
CoContext & operator=(const CoContext &) = delete;
130+
128131
// Post a callback to be executed on the executor thread (thread-safe).
129132
void post(std::function<void()> fn) { drain_->post(std::move(fn)); }
130133

131134
// Resume a coroutine directly. Call only from the executor thread.
132135
void resume(std::coroutine_handle<> h) { h.resume(); }
133136

134-
rclcpp::Node::SharedPtr node() { return node_; }
137+
rclcpp::Node & node() { return node_; }
135138

136139
template <typename T>
137140
[[nodiscard]] Task<T> create_task(Task<T> task)
@@ -182,7 +185,7 @@ class CoContext
182185
std::shared_ptr<TopicStream<MsgT>> subscribe(const std::string & topic, const rclcpp::QoS & qos)
183186
{
184187
auto stream = std::make_shared<TopicStream<MsgT>>(*this);
185-
stream->sub_ = node_->template create_subscription<MsgT>(
188+
stream->sub_ = node_.template create_subscription<MsgT>(
186189
topic, qos, [s = stream](typename MsgT::SharedPtr msg) {
187190
if (s->closed_) {
188191
return;
@@ -224,13 +227,13 @@ class CoContext
224227
srv->send_response(*req_id, response);
225228
}(cb, std::move(service_handle), std::move(request_id), std::move(request));
226229
};
227-
return node_->template create_service<ServiceT>(name, std::move(handler));
230+
return node_.template create_service<ServiceT>(name, std::move(handler));
228231
}
229232

230233
std::shared_ptr<TimerStream> create_timer(std::chrono::nanoseconds period)
231234
{
232235
auto stream = std::make_shared<TimerStream>(*this);
233-
stream->timer_ = node_->create_wall_timer(period, [s = stream, this]() {
236+
stream->timer_ = node_.create_wall_timer(period, [s = stream, this]() {
234237
if (s->waiter_) {
235238
auto w = s->waiter_;
236239
s->waiter_ = nullptr;
@@ -286,7 +289,9 @@ class CoContext
286289
using GoalHandleT = rclcpp_action::ServerGoalHandle<ActionT>;
287290

288291
return rclcpp_action::create_server<ActionT>(
289-
node_, name, std::forward<GoalCbT>(goal_callback), std::forward<CancelCbT>(cancel_callback),
292+
node_.get_node_base_interface(), node_.get_node_clock_interface(),
293+
node_.get_node_logging_interface(), node_.get_node_waitables_interface(), name,
294+
std::forward<GoalCbT>(goal_callback), std::forward<CancelCbT>(cancel_callback),
290295
[cb = std::forward<CallbackT>(callback)](
291296
const std::shared_ptr<GoalHandleT> goal_handle) mutable {
292297
[](CallbackT & cb, std::shared_ptr<GoalHandleT> gh) -> SpawnedTask {
@@ -377,7 +382,7 @@ inline void SleepAwaiter::await_suspend(std::coroutine_handle<> h)
377382
ctx.resume(h);
378383
};
379384

380-
timer = ctx.node()->create_wall_timer(duration, [finish]() { finish(); });
385+
timer = ctx.node().create_wall_timer(duration, [finish]() { finish(); });
381386

382387
register_cancel(
383388
cancel_cb_, token, ctx, h, [this]() { return done; },

rclcpp_async/include/rclcpp_async/task.hpp

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,21 @@ struct Task
129129
{
130130
o.handle = nullptr;
131131
}
132-
Task & operator=(Task &&) = delete;
132+
Task & operator=(Task && o) noexcept
133+
{
134+
if (this != &o) {
135+
if (handle) {
136+
handle.destroy();
137+
}
138+
handle = o.handle;
139+
started_ = o.started_;
140+
parent_cancel_cb_ = std::move(o.parent_cancel_cb_);
141+
o.handle = nullptr;
142+
}
143+
return *this;
144+
}
133145
Task(const Task &) = delete;
146+
Task() noexcept : handle(nullptr) {}
134147

135148
private:
136149
explicit Task(std::coroutine_handle<promise_type> h) : handle(h) {}
@@ -231,8 +244,21 @@ struct Task<void>
231244
{
232245
o.handle = nullptr;
233246
}
234-
Task & operator=(Task &&) = delete;
247+
Task & operator=(Task && o) noexcept
248+
{
249+
if (this != &o) {
250+
if (handle) {
251+
handle.destroy();
252+
}
253+
handle = o.handle;
254+
started_ = o.started_;
255+
parent_cancel_cb_ = std::move(o.parent_cancel_cb_);
256+
o.handle = nullptr;
257+
}
258+
return *this;
259+
}
235260
Task(const Task &) = delete;
261+
Task() noexcept : handle(nullptr) {}
236262

237263
private:
238264
explicit Task(std::coroutine_handle<promise_type> h) : handle(h) {}

rclcpp_async/include/rclcpp_async/tf_buffer.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,10 @@ class TfBuffer
5959
explicit TfBuffer(CoContext & ctx)
6060
: ctx_(ctx),
6161
tf_node_(std::make_shared<rclcpp::Node>(
62-
"_tf_listener", ctx.node()->get_namespace(),
62+
"_tf_listener", ctx.node().get_namespace(),
6363
rclcpp::NodeOptions().start_parameter_services(false).start_parameter_event_publisher(
6464
false))),
65-
buffer_(std::make_shared<tf2_ros::Buffer>(ctx.node()->get_clock()))
65+
buffer_(std::make_shared<tf2_ros::Buffer>(ctx.node().get_clock()))
6666
{
6767
auto cb = [this](tf2_msgs::msg::TFMessage::ConstSharedPtr msg) {
6868
on_tf_message(std::move(msg), false);

rclcpp_async/test/benchmark/bench_concurrent.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ int main(int argc, char ** argv)
130130
rclcpp::init(argc, argv);
131131

132132
auto node = std::make_shared<rclcpp::Node>("bench_concurrent_node");
133-
auto ctx = std::make_unique<CoContext>(node);
133+
auto ctx = std::make_unique<CoContext>(*node);
134134
rclcpp::executors::SingleThreadedExecutor executor;
135135
executor.add_node(node);
136136

rclcpp_async/test/benchmark/bench_pubsub.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ int main(int argc, char ** argv)
5050
rclcpp::init(argc, argv);
5151

5252
auto node = std::make_shared<rclcpp::Node>("bench_pubsub_node");
53-
auto ctx = std::make_unique<CoContext>(node);
53+
auto ctx = std::make_unique<CoContext>(*node);
5454
rclcpp::executors::SingleThreadedExecutor executor;
5555
executor.add_node(node);
5656

rclcpp_async/test/benchmark/bench_service.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ int main(int argc, char ** argv)
4646
rclcpp::init(argc, argv);
4747

4848
auto node = std::make_shared<rclcpp::Node>("bench_service_node");
49-
auto ctx = std::make_unique<CoContext>(node);
49+
auto ctx = std::make_unique<CoContext>(*node);
5050
rclcpp::executors::SingleThreadedExecutor executor;
5151
executor.add_node(node);
5252

rclcpp_async/test/benchmark/bench_sleep.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ int main(int argc, char ** argv)
9292
rclcpp::init(argc, argv);
9393

9494
auto node = std::make_shared<rclcpp::Node>("bench_sleep_node");
95-
auto ctx = std::make_unique<CoContext>(node);
95+
auto ctx = std::make_unique<CoContext>(*node);
9696
rclcpp::executors::SingleThreadedExecutor executor;
9797
executor.add_node(node);
9898

rclcpp_async/test/test_chained_goals.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ class ChainedGoalsTest : public ::testing::Test
4545
void SetUp() override
4646
{
4747
node_ = std::make_shared<rclcpp::Node>("test_chained_goals_node");
48-
ctx_ = std::make_unique<CoContext>(node_);
48+
ctx_ = std::make_unique<CoContext>(*node_);
4949
executor_.add_node(node_);
5050
action_client_ = rclcpp_action::create_client<Fibonacci>(node_, "test_chained_action");
5151
}

rclcpp_async/test/test_channel.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ class ChannelTest : public ::testing::Test
3232
void SetUp() override
3333
{
3434
node_ = std::make_shared<rclcpp::Node>("test_channel_node");
35-
ctx_ = std::make_unique<CoContext>(node_);
35+
ctx_ = std::make_unique<CoContext>(*node_);
3636
executor_.add_node(node_);
3737
}
3838

0 commit comments

Comments
 (0)