Skip to content

Commit 674b9c1

Browse files
committed
WIP add cancellation
1 parent 9345a83 commit 674b9c1

File tree

12 files changed

+463
-52
lines changed

12 files changed

+463
-52
lines changed

.clang-tidy

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ CheckOptions:
2424
- { key: readability-identifier-naming.ProtectedMemberSuffix, value: _ }
2525
- { key: readability-identifier-naming.MacroDefinitionCase, value: UPPER_CASE }
2626
- { key: readability-identifier-naming.EnumConstantCase, value: CamelCase }
27-
- { key: readability-identifier-naming.ConstexprVariableCase, value: CamelCase }
28-
- { key: readability-identifier-naming.ConstexprVariablePrefix, value: k }
2927
- { key: readability-identifier-naming.GlobalConstantCase, value: CamelCase }
3028
- { key: readability-identifier-naming.GlobalConstantPrefix, value: g }
3129
- { key: readability-identifier-naming.MemberConstantCase, value: CamelCase }

cppesphomeapi/include/cppesphomeapi/api_client.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define CPPESPHOMEAPI_API_CLIENT_HPP
33
#include <cstdint>
44
#include <memory>
5+
#include <stop_token>
56
#include <string>
67
#include <cppesphomeapi/cppesphomeapi_export.hpp>
78
#include "api_version.hpp"
@@ -22,6 +23,7 @@ class CPPESPHOMEAPI_EXPORT ApiClient
2223
{
2324
public:
2425
explicit ApiClient(const boost::asio::any_io_executor &executor,
26+
std::stop_source &stop_source,
2527
std::string hostname,
2628
std::uint16_t port = 6053,
2729
std::string password = "");
@@ -36,6 +38,7 @@ class CPPESPHOMEAPI_EXPORT ApiClient
3638
AsyncResult<void> async_light_command(LightCommand light_command);
3739
AsyncResult<void> enable_logs(EspHomeLogLevel log_level, bool config_dump);
3840
AsyncResult<LogEntry> receive_log();
41+
void close();
3942

4043
ApiClient(const ApiClient &) = delete;
4144
ApiClient(ApiClient &&) = delete;

cppesphomeapi/src/CMakeLists.txt

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
1-
target_sources(cppesphomeapi PRIVATE
2-
api_client.cpp
3-
make_unexpected_result.cpp
4-
plain_text_protocol.cpp
5-
api_connection.cpp
6-
entity_conversion.cpp
7-
entity_conversion.hpp
1+
target_sources(cppesphomeapi
2+
PRIVATE
3+
api_client.cpp
4+
make_unexpected_result.cpp
5+
plain_text_protocol.cpp
6+
api_connection.cpp
7+
entity_conversion.cpp
8+
entity_conversion.hpp
9+
executor.hpp
10+
net.hpp
11+
net.cpp
812
)

cppesphomeapi/src/api_client.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@
33
namespace cppesphomeapi
44
{
55
ApiClient::ApiClient(const boost::asio::any_io_executor &executor,
6+
std::stop_source &stop_source,
67
std::string hostname,
78
std::uint16_t port,
89
std::string password)
9-
: connection_{std::make_unique<ApiConnection>(std::move(hostname), port, std::move(password), executor)}
10+
: connection_{
11+
std::make_unique<ApiConnection>(std::move(hostname), port, std::move(password), stop_source, executor)}
1012
{}
1113

1214
ApiClient::~ApiClient() = default;
@@ -56,4 +58,9 @@ const std::string &ApiClient::device_name() const
5658
return connection_->device_name();
5759
}
5860

61+
void ApiClient::close()
62+
{
63+
connection_->cancel();
64+
}
65+
5966
} // namespace cppesphomeapi

cppesphomeapi/src/api_connection.cpp

Lines changed: 66 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
#include <boost/asio.hpp>
44
#include "api.pb.h"
55
#include "entity_conversion.hpp"
6+
#include "executor.hpp"
67
#include "make_unexpected_result.hpp"
8+
#include "net.hpp"
79
#include "plain_text_protocol.hpp"
810

911
namespace asio = boost::asio;
@@ -23,25 +25,51 @@ namespace cppesphomeapi
2325
ApiConnection::ApiConnection(std::string hostname,
2426
std::uint16_t port,
2527
std::string password,
28+
std::stop_source &stop_source,
2629
const asio::any_io_executor &executor)
2730
: hostname_{std::move(hostname)}
2831
, port_{port}
2932
, password_{std::move(password)}
30-
, strand_{asio::make_strand(executor)}
31-
, socket_{strand_}
32-
{}
33+
, socket_{executor}
34+
{
35+
executor::addStopService(executor::getContext(executor), stop_source);
36+
}
37+
38+
void ApiConnection::cancel()
39+
{
40+
executor::stopAssetOf(executor::getContext(socket_.get_executor())).request_stop();
41+
}
3342

3443
AsyncResult<void> ApiConnection::connect()
3544
{
3645
auto executor = co_await this_coro::executor;
37-
tcp::resolver resolver{executor};
38-
const auto resolved = co_await resolver.async_resolve(hostname_, std::to_string(port_));
46+
net::Timer timer{executor};
47+
timer.expires_after(std::chrono::milliseconds{500});
48+
49+
auto endpoints = co_await net::resolveHostEndpoints(hostname_, net::Port{port_}, timer);
50+
if (not endpoints.has_value())
51+
{
52+
co_return make_unexpected_result(
53+
ApiErrorCode::UnexpectedMessage,
54+
std::format(
55+
"Could not resolve host {}:{}. Failed with error: {}", hostname_, port_, endpoints.error().message()));
56+
}
57+
58+
timer.expires_after(std::chrono::milliseconds{500});
59+
auto connect_result = co_await net::connectTo(*endpoints, timer);
60+
if (not connect_result.has_value())
61+
{
62+
co_return make_unexpected_result(ApiErrorCode::UnexpectedMessage,
63+
std::format("Could not connect to host {}:{}. Failed with error: {}",
64+
hostname_,
65+
port_,
66+
endpoints.error().message()));
67+
}
3968

40-
co_await socket_.async_connect(resolved->endpoint());
41-
socket_.set_option(asio::socket_base::keep_alive{true});
69+
socket_ = std::move(*connect_result);
4270

43-
asio::co_spawn(strand_, std::bind(&ApiConnection::async_receive, this), asio::detached);
44-
spawn_heartbeat();
71+
executor::commission(socket_.get_executor(), &ApiConnection::receive_loop, this);
72+
executor::commission(socket_.get_executor(), &ApiConnection::heartbeat_loop, this);
4573

4674
REQUIRE_SUCCESS(co_await send_message_hello());
4775
REQUIRE_SUCCESS(co_await send_message_connect());
@@ -165,14 +193,25 @@ AsyncResult<void> ApiConnection::send_message(const google::protobuf::Message &m
165193
if (packet.has_value())
166194
{
167195
std::println("Sending {}", message.GetTypeName());
168-
const auto written = co_await socket_.async_write_some(asio::buffer(packet.value()));
169-
if (written != packet->size())
196+
auto executor = co_await this_coro::executor;
197+
net::Timer timer{executor};
198+
const auto watch_dog = executor::abort(socket_, timer);
199+
timer.expires_after(std::chrono::milliseconds{500});
200+
201+
const auto written = co_await net::sendTo(socket_, timer, packet.value());
202+
if (written.has_value() && written.value() != packet->size())
170203
{
171204
co_return make_unexpected_result(
172205
ApiErrorCode::SendError,
173206
std::format("Could not send message. Bytes written are different: expected={}, written={}",
174207
packet->size(),
175-
written));
208+
*written));
209+
}
210+
else if (not written.has_value())
211+
{
212+
co_return make_unexpected_result(
213+
ApiErrorCode::SendError,
214+
std::format("Could not send message in time. Error: {}", written.error().message()));
176215
}
177216
co_return Result<void>{};
178217
}
@@ -207,15 +246,23 @@ AsyncResult<LogEntry> ApiConnection::receive_log()
207246
};
208247
}
209248

210-
boost::asio::awaitable<void> ApiConnection::async_receive()
249+
boost::asio::awaitable<void> ApiConnection::receive_loop()
211250
{
212-
namespace asio = boost::asio;
213-
std::array<std::uint8_t, 4096> buffer{};
214-
bool do_receive{true};
251+
std::array<std::byte, 4096> buffer{};
252+
auto executor = co_await this_coro::executor;
253+
net::Timer timer{executor};
254+
const auto watch_dog = executor::abort(socket_, timer);
215255

256+
bool do_receive{true};
216257
while (do_receive)
217258
{
218-
const auto received_bytes = co_await socket_.async_receive(asio::buffer(buffer));
259+
timer.expires_after(std::chrono::seconds{100}); // at least every 90secs. a message should be received
260+
const auto received_bytes = co_await net::receiveFrom(socket_, timer, buffer);
261+
if (not received_bytes.has_value())
262+
{
263+
std::println("Could not receive bytes. Error {}", received_bytes.error().message());
264+
break;
265+
}
219266
auto result = PlainTextProtocol{}
220267
.decode_multiple<proto::SubscribeLogsResponse,
221268
proto::DeviceInfoResponse,
@@ -247,7 +294,7 @@ boost::asio::awaitable<void> ApiConnection::async_receive()
247294
proto::ListEntitiesTimeResponse,
248295
proto::ListEntitiesUpdateResponse,
249296
proto::ListEntitiesValveResponse>(
250-
std::span{buffer.begin(), received_bytes}, [this](auto &&message) {
297+
std::span{buffer.cbegin(), received_bytes.value()}, [this](auto &&message) {
251298
std::vector<boost::asio::any_completion_handler<void(MessageWrapper)>> handlers;
252299
{
253300
std::unique_lock l{handler_mtx_};
@@ -276,11 +323,6 @@ boost::asio::awaitable<void> ApiConnection::async_receive()
276323
std::println("RECEIVE ENDED!");
277324
}
278325

279-
void ApiConnection::spawn_heartbeat()
280-
{
281-
asio::co_spawn(strand_, std::bind(&ApiConnection::heartbeat_loop, this), asio::detached);
282-
}
283-
284326
boost::asio::awaitable<void> ApiConnection::heartbeat_loop()
285327
{
286328
auto executor = co_await this_coro::executor;
@@ -296,6 +338,7 @@ boost::asio::awaitable<void> ApiConnection::heartbeat_loop()
296338
// otherwise the connection is dead.
297339
co_await receive_message<proto::PingResponse>(asio::use_awaitable);
298340
}
341+
executor::stopAssetOf(executor).request_stop();
299342
}
300343

301344
} // namespace cppesphomeapi

cppesphomeapi/src/api_connection.hpp

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
#pragma once
22
#include <cstdint>
3+
#include <stop_token>
34
#include <string>
45
#include <boost/asio/any_completion_handler.hpp>
56
#include <boost/asio/executor.hpp>
7+
#include <boost/asio/experimental/awaitable_operators.hpp>
68
#include <boost/asio/strand.hpp>
79
#include <boost/asio/use_awaitable.hpp>
810
#include <google/protobuf/message.h>
@@ -14,8 +16,8 @@
1416
#include "cppesphomeapi/device_info.hpp"
1517
#include "make_unexpected_result.hpp"
1618
#include "message_wrapper.hpp"
19+
#include "net.hpp"
1720
#include "overloaded.hpp"
18-
#include "tcp.hpp"
1921

2022
namespace cppesphomeapi
2123
{
@@ -25,6 +27,7 @@ class ApiConnection
2527
explicit ApiConnection(std::string hostname,
2628
std::uint16_t port,
2729
std::string password,
30+
std::stop_source &stop_source,
2831
const boost::asio::any_io_executor &executor);
2932

3033
AsyncResult<void> connect();
@@ -39,6 +42,8 @@ class ApiConnection
3942
AsyncResult<void> enable_logs(EspHomeLogLevel log_level, bool config_dump);
4043
AsyncResult<LogEntry> receive_log();
4144

45+
void cancel();
46+
4247
private:
4348
AsyncResult<void> send_message(const google::protobuf::Message &message);
4449

@@ -49,19 +54,29 @@ class ApiConnection
4954
std::unique_lock l{handler_mtx_};
5055
handlers_.emplace_back(std::move(handler));
5156
};
52-
return boost::asio::async_initiate<CompletionToken, void(MessageWrapper)>(
53-
init, // First, pass the function object that launches the operation,
54-
token // then the completion token that will be transformed to a handler,
55-
); // and, finally, any additional arguments to the function object.
57+
return boost::asio::async_initiate<CompletionToken, void(MessageWrapper)>(init, token);
5658
}
5759

5860
template <typename TMsg>
5961
auto receive_message(auto &&completion_token) -> AsyncResult<std::shared_ptr<TMsg>>
6062
{
63+
namespace aex = boost::asio::experimental;
64+
using aex::awaitable_operators::operator||;
65+
auto executor = co_await boost::asio::this_coro::executor;
66+
net::Timer timer{executor};
67+
timer.expires_after(std::chrono::milliseconds{250});
6168
while (true)
6269
{
63-
const auto received_message =
64-
co_await async_receive_message(std::forward<decltype(completion_token)>(completion_token));
70+
//! TODO: the added handler might get aborted. How to remove it from the vector holding the completion
71+
//! handlers?
72+
const auto received_message_or_error =
73+
co_await (async_receive_message(std::forward<decltype(completion_token)>(completion_token)) ||
74+
timer.async_wait());
75+
if (std::holds_alternative<std::tuple<net::ErrorCode>>(received_message_or_error))
76+
{
77+
break;
78+
}
79+
auto &&received_message = std::get<cppesphomeapi::MessageWrapper>(received_message_or_error);
6580
if (received_message.template holds_message<TMsg>())
6681
{
6782
co_return received_message.template as<TMsg>();
@@ -114,16 +129,14 @@ class ApiConnection
114129
}
115130

116131
private:
117-
boost::asio::awaitable<void> async_receive();
118-
void spawn_heartbeat();
132+
boost::asio::awaitable<void> receive_loop();
119133
boost::asio::awaitable<void> heartbeat_loop();
120134

121135
private:
122136
std::string hostname_;
123137
std::uint16_t port_;
124138
std::string password_;
125-
boost::asio::strand<boost::asio::any_io_executor> strand_;
126-
tcp::socket socket_;
139+
net::Socket socket_;
127140

128141
std::string device_name_;
129142
std::optional<ApiVersion> api_version_;

0 commit comments

Comments
 (0)