diff --git a/CMakeLists.txt b/CMakeLists.txt index 692276af..807c993b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -40,6 +40,7 @@ option( include(GNUInstallDirs) set(INSTALL_CONFIGDIR ${CMAKE_INSTALL_LIBDIR}/cmake/${PROJECT_NAME}) +if(FALSE) if(PROJECT_IS_TOP_LEVEL AND NOT BEMAN_EXECUTION_ENABLE_INSTALL OR CMAKE_SKIP_INSTALL_RULES) set(CMAKE_SKIP_INSTALL_RULES ON) @@ -90,6 +91,7 @@ if(PROJECT_IS_TOP_LEVEL AND NOT BEMAN_EXECUTION_ENABLE_INSTALL OR CMAKE_SKIP_INS # LINKER "lld" ) endif() +endif() add_subdirectory(src/beman/execution) diff --git a/include/beman/execution/detail/as_tuple.hpp b/include/beman/execution/detail/as_tuple.hpp new file mode 100644 index 00000000..1b9726c0 --- /dev/null +++ b/include/beman/execution/detail/as_tuple.hpp @@ -0,0 +1,25 @@ +// include/beman/execution/detail/as_tuple.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AS_TUPLE +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_AS_TUPLE + +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { +template +struct as_tuple; +template +struct as_tuple { + using type = ::beman::execution::detail::decayed_tuple; +}; + +template +using as_tuple_t = typename ::beman::execution::detail::as_tuple::type; +} // namespace beman::execution::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution/detail/associate.hpp b/include/beman/execution/detail/associate.hpp new file mode 100644 index 00000000..f89adf6d --- /dev/null +++ b/include/beman/execution/detail/associate.hpp @@ -0,0 +1,161 @@ +// include/beman/execution/detail/associate.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_ASSOCIATE +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_ASSOCIATE + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include //-dk:TODO remove + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { +template <::beman::execution::scope_token Token, ::beman::execution::sender Sender> +struct associate_data { + using wrap_sender = ::std::remove_cvref_t().wrap(::std::declval()))>; + + explicit associate_data(Token t, Sender&& s) : token(t), sender(this->token.wrap(::std::forward(s))) { + if (!token.try_associate()) { + this->sender.reset(); + } + } + associate_data(const associate_data& other) noexcept(::std::is_nothrow_copy_constructible_v && + noexcept(token.try_associate())) + : token(other.token), sender() { + if (other.sender && this->token.try_associate()) { + try { + this->sender.emplace(*other.sender); + } catch (...) { + this->token.disassociate(); + } + } + } + associate_data(associate_data&& other) noexcept(::std::is_nothrow_move_constructible_v) + : token(other.token), sender(::std::move(other.sender)) { + other.sender.reset(); + } + auto operator=(const associate_data&) -> associate_data& = delete; + auto operator=(associate_data&&) -> associate_data& = delete; + ~associate_data() { + if (this->sender) { + this->sender.reset(); + this->token.disassociate(); + } + } + + auto release() -> ::std::optional<::std::pair> { + return this->sender ? (std::unique_ptr, decltype([](auto* opt) { opt->reset(); })>( + &this->sender), + ::std::optional{::std::pair{::std::move(this->token), ::std::move(*this->sender)}}) + : ::std::optional<::std::pair>{}; + } + + Token token; + ::std::optional sender; +}; +template <::beman::execution::scope_token Token, ::beman::execution::sender Sender> +associate_data(Token, Sender&&) -> associate_data; + +struct associate_t { + template <::beman::execution::sender Sender, ::beman::execution::scope_token Token> + auto operator()(Sender&& sender, Token&& token) const { + auto domain(::beman::execution::detail::get_domain_early(sender)); + return ::beman::execution::transform_sender( + domain, + ::beman::execution::detail::make_sender( + *this, + ::beman::execution::detail::associate_data(::std::forward(token), + ::std::forward(sender)))); + } +}; + +template <> +struct impls_for : ::beman::execution::detail::default_impls { + template + struct get_noexcept : ::std::false_type {}; + template + struct get_noexcept<::beman::execution::detail::basic_sender, Receiver> + : ::std::bool_constant< + ::std::is_nothrow_move_constructible_v::wrap_sender>&& ::beman:: + execution::detail::nothrow_callable<::beman::execution::connect_t, + typename ::std::remove_cvref_t::wrap_sender, + Receiver>> {}; + + static constexpr auto get_state = + [](Sender&& sender, Receiver& receiver) noexcept( + ::std::is_nothrow_constructible_v<::std::remove_cvref_t, Sender>&& + get_noexcept<::std::remove_cvref_t, Receiver>::value) { + auto [_, data] = ::std::forward(sender); + auto dataParts{data.release()}; + + using scope_token = decltype(dataParts->first); + using wrap_sender = decltype(dataParts->second); + using op_t = decltype(::beman::execution::connect(::std::move(dataParts->second), + ::std::forward(receiver))); + + struct op_state { + using sop_t = op_t; + using sscope_token = scope_token; + bool associated{false}; + union { + Receiver* rcvr; + struct { + sscope_token tok; + sop_t op; + } assoc; + }; + explicit op_state(Receiver& r) noexcept : rcvr(::std::addressof(r)) {} + explicit op_state(sscope_token tk, wrap_sender&& sndr, Receiver& r) try + : associated(true), assoc(tk, ::beman::execution::connect(::std::move(sndr), ::std::move(r))) { + } catch (...) { + tk.disassociate(); + throw; + } + op_state(op_state&&) = delete; + ~op_state() { + if (this->associated) { + this->assoc.op.~sop_t(); + this->assoc.tok.disassociate(); + this->assoc.tok.~sscope_token(); + } + } + auto run() noexcept -> void { + if (this->associated) { + ::beman::execution::start(this->assoc.op); + } else { + ::beman::execution::set_stopped(::std::move(*this->rcvr)); + } + } + }; + return dataParts ? op_state(::std::move(dataParts->first), ::std::move(dataParts->second), receiver) + : op_state(receiver); + }; + static constexpr auto start = [](auto& state, auto&&) noexcept -> void { state.run(); }; +}; + +template +struct completion_signatures_for_impl< + ::beman::execution::detail::basic_sender<::beman::execution::detail::associate_t, Data>, + Env> { + using type = ::beman::execution::completion_signatures<::beman::execution::set_value_t()>; +}; +} // namespace beman::execution::detail + +namespace beman::execution { +using associate_t = ::beman::execution::detail::associate_t; +inline constexpr associate_t associate{}; +} // namespace beman::execution + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution/detail/counting_scope.hpp b/include/beman/execution/detail/counting_scope.hpp new file mode 100644 index 00000000..46370e7f --- /dev/null +++ b/include/beman/execution/detail/counting_scope.hpp @@ -0,0 +1,60 @@ +// include/beman/execution/detail/counting_scope.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_COUNTING_SCOPE +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_COUNTING_SCOPE + +#include +#include +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution { +class counting_scope; +} + +// ---------------------------------------------------------------------------- + +class beman::execution::counting_scope : public ::beman::execution::detail::counting_scope_base { + public: + class token; + + auto join() noexcept -> ::beman::execution::sender auto { + return ::beman::execution::detail::counting_scope_join(this); + } + auto get_token() noexcept -> token; + auto request_stop() noexcept -> void { this->stop_source.request_stop(); } + + private: + ::beman::execution::inplace_stop_source stop_source{}; +}; + +// ---------------------------------------------------------------------------- + +class beman::execution::counting_scope::token : public ::beman::execution::detail::counting_scope_base::token { + public: + template <::beman::execution::sender Sender> + auto wrap(Sender&& sender) const noexcept -> ::beman::execution::sender auto { + return ::beman::execution::detail::stop_when( + ::std::forward(sender), + static_cast<::beman::execution::counting_scope*>(this->scope)->stop_source.get_token()); + } + + private: + friend class beman::execution::counting_scope; + explicit token(::beman::execution::counting_scope* s) + : ::beman::execution::detail::counting_scope_base::token(s) {} +}; + +// ---------------------------------------------------------------------------- + +inline auto beman::execution::counting_scope::get_token() noexcept -> beman::execution::counting_scope::token { + return beman::execution::counting_scope::token(this); +} + +#endif diff --git a/include/beman/execution/detail/counting_scope_base.hpp b/include/beman/execution/detail/counting_scope_base.hpp new file mode 100644 index 00000000..56ca4f5e --- /dev/null +++ b/include/beman/execution/detail/counting_scope_base.hpp @@ -0,0 +1,166 @@ +// include/beman/execution/detail/counting_scope_base.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_COUNTING_SCOPE_BASE +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_COUNTING_SCOPE_BASE + +#include +#include +#include +#include +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { +struct counting_scope_base; +} + +// ---------------------------------------------------------------------------- + +class beman::execution::detail::counting_scope_base : ::beman::execution::detail::immovable { + public: + counting_scope_base() = default; + counting_scope_base(counting_scope_base&&) = delete; + ~counting_scope_base(); + + static constexpr ::std::size_t max_associations{8194u}; + + auto close() noexcept -> void; + + struct node { + virtual auto complete() noexcept -> void = 0; + virtual auto complete_inline() noexcept -> void = 0; + node* next{}; + }; + auto start_node(node*) -> void; + + protected: + class token { + public: + auto try_associate() const noexcept -> bool { return this->scope->try_associate(); } + auto disassociate() const noexcept -> void { this->scope->disassociate(); } + + protected: + explicit token(::beman::execution::detail::counting_scope_base* s) : scope(s) {} + ::beman::execution::detail::counting_scope_base* scope; + }; + + private: + enum class state_t : unsigned char { + unused, + open, + open_and_joining, + closed, + closed_and_joining, + unused_and_closed, + joined + }; + + auto try_associate() noexcept -> bool; + auto disassociate() noexcept -> void; + auto complete() noexcept -> void; + auto add_node(node* n, ::std::lock_guard<::std::mutex>&) noexcept -> void; + + ::std::mutex mutex; + //-dk:TODO fuse state and count and use atomic accesses + ::std::size_t count{}; + state_t state{state_t::unused}; + node* head{}; +}; + +// ---------------------------------------------------------------------------- + +beman::execution::detail::counting_scope_base::~counting_scope_base() { + ::std::lock_guard kerberos(this->mutex); + switch (this->state) { + default: + ::std::terminate(); + case state_t::unused: + case state_t::unused_and_closed: + case state_t::joined: + break; + } +} + +auto beman::execution::detail::counting_scope_base::close() noexcept -> void { + switch (this->state) { + default: + break; + case state_t::unused: + this->state = state_t::unused_and_closed; + break; + case state_t::open: + this->state = state_t::closed; + break; + case state_t::open_and_joining: + this->state = state_t::closed_and_joining; + break; + } +} + +auto beman::execution::detail::counting_scope_base::add_node(node* n, ::std::lock_guard<::std::mutex>&) noexcept + -> void { + n->next = std::exchange(this->head, n); +} + +auto beman::execution::detail::counting_scope_base::try_associate() noexcept -> bool { + ::std::lock_guard lock(this->mutex); + switch (this->state) { + default: + return false; + case state_t::unused: + this->state = state_t::open; // fall-through! + [[fallthrough]]; + case state_t::open: + case state_t::open_and_joining: + ++this->count; + return true; + } +} +auto beman::execution::detail::counting_scope_base::disassociate() noexcept -> void { + { + ::std::lock_guard lock(this->mutex); + if (0u < --this->count) + return; + this->state = state_t::joined; + } + this->complete(); +} + +auto beman::execution::detail::counting_scope_base::complete() noexcept -> void { + node* current{[this] { + ::std::lock_guard lock(this->mutex); + return ::std::exchange(this->head, nullptr); + }()}; + while (current) { + ::std::exchange(current, current->next)->complete(); + } +} + +auto beman::execution::detail::counting_scope_base::start_node(node* n) -> void { + ::std::lock_guard kerberos(this->mutex); + switch (this->state) { + case ::beman::execution::detail::counting_scope_base::state_t::unused: + case ::beman::execution::detail::counting_scope_base::state_t::unused_and_closed: + case ::beman::execution::detail::counting_scope_base::state_t::joined: + this->state = ::beman::execution::detail::counting_scope_base::state_t::joined; + n->complete_inline(); + return; + case ::beman::execution::detail::counting_scope_base::state_t::open: + this->state = ::beman::execution::detail::counting_scope_base::state_t::open_and_joining; + break; + case ::beman::execution::detail::counting_scope_base::state_t::open_and_joining: + break; + case ::beman::execution::detail::counting_scope_base::state_t::closed: + this->state = ::beman::execution::detail::counting_scope_base::state_t::closed_and_joining; + break; + case ::beman::execution::detail::counting_scope_base::state_t::closed_and_joining: + break; + } + this->add_node(n, kerberos); +} + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution/detail/counting_scope_join.hpp b/include/beman/execution/detail/counting_scope_join.hpp new file mode 100644 index 00000000..0673242c --- /dev/null +++ b/include/beman/execution/detail/counting_scope_join.hpp @@ -0,0 +1,87 @@ +// include/beman/execution/detail/counting_scope_join.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_COUNTING_SCOPE_JOIN +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_COUNTING_SCOPE_JOIN + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { +struct counting_scope_join_t { + template <::beman::execution::receiver> + struct state; + + auto operator()(::beman::execution::detail::counting_scope_base* ptr) const { + return ::beman::execution::detail::make_sender(*this, ptr); + } +}; +inline constexpr counting_scope_join_t counting_scope_join{}; + +template +struct completion_signatures_for_impl< + ::beman::execution::detail::basic_sender<::beman::execution::detail::counting_scope_join_t, + ::beman::execution::detail::counting_scope_base*>, + Env> { + using type = ::beman::execution::completion_signatures<::beman::execution::set_value_t()>; +}; + +} // namespace beman::execution::detail + +// ---------------------------------------------------------------------------- + +template <::beman::execution::receiver Receiver> +struct beman::execution::detail::counting_scope_join_t::state : ::beman::execution::detail::counting_scope_base::node { + using op_t = decltype(::beman::execution::connect(::beman::execution::schedule(::beman::execution::get_scheduler( + ::beman::execution::get_env(::std::declval()))), + ::std::declval())); + + ::beman::execution::detail::counting_scope_base* scope; + explicit state(::beman::execution::detail::counting_scope_base* s, Receiver& r) + : scope(s), + receiver(r), + op(::beman::execution::connect(::beman::execution::schedule(::beman::execution::get_scheduler( + ::beman::execution::get_env(this->receiver))), + this->receiver)) {} + virtual ~state() = default; + + auto complete() noexcept -> void override { ::beman::execution::start(this->op); } + auto complete_inline() noexcept -> void override { ::beman::execution::set_value(::std::move(this->receiver)); } + + auto start() noexcept -> void { this->scope->start_node(this); } + + ::std::remove_cvref_t& receiver; + op_t op; +}; + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail { +template <> +struct impls_for<::beman::execution::detail::counting_scope_join_t> : ::beman::execution::detail::default_impls { + static constexpr auto get_state = [](auto&& sender, Receiver& receiver) noexcept(false) { + auto [_, self] = sender; + return ::beman::execution::detail::counting_scope_join_t::state(self, receiver); + }; + static constexpr auto start = [](auto& s, auto&) noexcept { s.start(); }; +}; +} // namespace beman::execution::detail + +// ---------------------------------------------------------------------------- + +#endif diff --git a/include/beman/execution/detail/meta_contain_same.hpp b/include/beman/execution/detail/meta_contain_same.hpp new file mode 100644 index 00000000..c5874e8d --- /dev/null +++ b/include/beman/execution/detail/meta_contain_same.hpp @@ -0,0 +1,27 @@ +// include/beman/execution/detail/meta_contain_same.hpp -*-C++-*- +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception + +#ifndef INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_META_CONTAIN_SAME +#define INCLUDED_INCLUDE_BEMAN_EXECUTION_DETAIL_META_CONTAIN_SAME + +#include + +// ---------------------------------------------------------------------------- + +namespace beman::execution::detail::meta { +template +struct contain_same_t; +template