Skip to content

Commit a14ccc2

Browse files
committed
exec.split: implementation and test file
1 parent bbb1f09 commit a14ccc2

File tree

3 files changed

+163
-0
lines changed

3 files changed

+163
-0
lines changed
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// include/beman/execution26/detail/split.hpp -*-C++-*-
2+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
3+
4+
#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_SPLIT
5+
#define INCLUDED_BEMAN_EXECUTION26_DETAIL_SPLIT
6+
7+
#include <beman/execution26/detail/atomic_intrusive_queue.hpp>
8+
#include <beman/execution26/detail/connect_result_t.hpp>
9+
#include <beman/execution26/detail/default_impls.hpp>
10+
#include <beman/execution26/detail/get_stop_token.hpp>
11+
#include <beman/execution26/detail/impls_for.hpp>
12+
#include <beman/execution26/detail/inplace_stop_source.hpp>
13+
#include <beman/execution26/detail/meta_combine.hpp>
14+
#include <beman/execution26/detail/meta_unique.hpp>
15+
#include <beman/execution26/detail/receiver.hpp>
16+
#include <beman/execution26/detail/value_types_of_t.hpp>
17+
18+
#include <atomic>
19+
#include <optional>
20+
#include <variant>
21+
#include <tuple>
22+
23+
// ----------------------------------------------------------------------------
24+
25+
namespace beman::execution26::detail {
26+
27+
struct split_t {};
28+
29+
template <>
30+
struct impls_for<split_t> : ::beman::execution26::detail::default_impls {
31+
struct local_state_base {
32+
local_state_base* next;
33+
virtual auto complete() noexcept -> void = 0;
34+
35+
protected:
36+
~local_state_base() = default;
37+
};
38+
39+
// [exec.split-9]
40+
template <class Sndr>
41+
struct shared_state;
42+
43+
template <class Sndr>
44+
struct split_receiver {
45+
using receiver_concept = ::beman::execution26::receiver_t;
46+
47+
split_receiver(shared_state<Sndr>* state) noexcept : sh_state(state) { state->inc_ref(); }
48+
49+
template <class Tag, class... Args>
50+
void complete(Tag, Args&&... args) noexcept {
51+
using tuple_t = ::beman::execution26::detail::decayed_tuple<Tag, Args...>;
52+
try {
53+
sh_state->result.template emplace<tuple_t>(Tag(), ::std::forward<Args>(args)...);
54+
} catch (...) {
55+
using tuple_err = ::std::tuple<::beman::execution26::set_error_t, ::std::exception_ptr>;
56+
sh_state->result.template emplace<tuple_err>(::beman::execution26::set_error,
57+
::std::current_exception());
58+
}
59+
const auto state = sh_state;
60+
state->inc_ref();
61+
state->notify();
62+
state->dec_ref();
63+
}
64+
65+
template <class... Args>
66+
void set_value(Args&&... args) && noexcept {
67+
complete(::beman::execution26::set_value, ::std::forward<Args>(args)...);
68+
}
69+
70+
template <class Error>
71+
void set_error(Error&& err) && noexcept {
72+
complete(::beman::execution26::set_error, ::std::forward<Error>(err));
73+
}
74+
75+
void set_stopped() && noexcept { complete(::beman::execution26::set_stopped); }
76+
77+
struct env {
78+
shared_state<Sndr>* sh_state;
79+
80+
::beman::execution26::inplace_stop_token query(::beman::execution26::get_stop_token_t) const noexcept {
81+
return sh_state->stop_src.get_token();
82+
}
83+
};
84+
85+
env get_env() const noexcept { return env{sh_state}; }
86+
87+
shared_state<Sndr>* sh_state;
88+
};
89+
90+
// [exec.split-10]
91+
template <class Sndr>
92+
struct shared_state {
93+
template <class Tag>
94+
struct tagged_tuple {
95+
template <class... Args>
96+
using invoke = ::std::tuple<Tag, ::std::decay_t<Args>...>;
97+
};
98+
99+
using variant_type = ::beman::execution26::detail::meta::unique<::beman::execution26::detail::meta::combine<
100+
::std::variant<::std::monostate>,
101+
::std::variant<::std::tuple<::beman::execution26::set_stopped_t>>,
102+
::std::variant<::std::tuple<::beman::execution26::set_error_t, ::std::exception_ptr>>,
103+
::beman::execution26::error_types_of_t<Sndr,
104+
typename shared_state<Sndr>::env,
105+
tagged_tuple<::beman::execution26::set_error_t>::template invoke>,
106+
::beman::execution26::value_types_of_t<Sndr,
107+
typename shared_state<Sndr>::env,
108+
tagged_tuple<::beman::execution26::set_value_t>::template invoke>>>;
109+
110+
using state_list_type = ::beman::execution26::detail::atomic_intrusive_queue<&local_state_base::next>;
111+
112+
explicit shared_state(Sndr&& sndr);
113+
114+
void start() noexcept {
115+
assert(op_state);
116+
::beman::execution26::start(*op_state);
117+
}
118+
119+
void notify() noexcept {
120+
op_state.reset();
121+
::beman::execution26::detail::intrusive_queue listeners = waiting_states.pop_all_and_shutdown();
122+
while (auto listener = listeners.pop()) {
123+
listener->complete();
124+
}
125+
}
126+
127+
void inc_ref() noexcept { ref_count.fetch_add(1); }
128+
129+
void dec_ref() noexcept {
130+
if (1 == ref_count.fetch_sub(1)) {
131+
delete this;
132+
}
133+
}
134+
135+
::beman::execution26::inplace_stop_source stop_src{};
136+
variant_type result{};
137+
state_list_type waiting_states{};
138+
::std::atomic<::std::size_t> ref_count{0};
139+
::std::optional<::beman::execution26::connect_result_t<Sndr, split_receiver<Sndr>>> op_state{};
140+
};
141+
};
142+
143+
} // namespace beman::execution26::detail
144+
145+
namespace beman::execution26 {
146+
using split_t = ::beman::execution26::detail::split_t;
147+
148+
inline constexpr ::beman::execution26::split_t split{};
149+
} // namespace beman::execution26
150+
151+
#endif

tests/beman/execution26/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ list(
4949
exec-snd-expos.test
5050
exec-snd-transform.test
5151
exec-starts-on.test
52+
exec-split.test
5253
exec-sync-wait.test
5354
exec-then.test
5455
exec-utils-cmplsigs.test
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
// src/beman/execution26/tests/split.test.cpp -*-C++-*-
2+
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
3+
4+
#include <beman/execution26/detail/split.hpp>
5+
#include <beman/execution26/execution.hpp>
6+
#include <test/execution.hpp>
7+
#include <concepts>
8+
9+
// ----------------------------------------------------------------------------
10+
11+
TEST(exec_split) {}

0 commit comments

Comments
 (0)