Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion include/beman/execution/detail/bulk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#ifndef INCLUDED_BEMAN_EXECUTION_DETAIL_BULK
#define INCLUDED_BEMAN_EXECUTION_DETAIL_BULK

#include "beman/execution/detail/sender_adaptor.hpp"
#include "beman/execution/detail/sender_adaptor_closure.hpp"
#include <beman/execution/detail/get_completion_signatures.hpp>
#include <beman/execution/detail/meta_combine.hpp>
#include <beman/execution/detail/meta_unique.hpp>
Expand All @@ -29,7 +31,13 @@
#include <beman/execution/detail/suppress_push.hpp>
namespace beman::execution::detail {

struct bulk_t {
struct bulk_t : ::beman::execution::sender_adaptor_closure<bulk_t> {

template <class Shape, class f>
requires(std::is_integral_v<Shape> && ::beman::execution::detail::movable_value<f>)
auto operator()(Shape&& shape, f&& fun) const {
return beman::execution::detail::sender_adaptor{*this, std::forward<Shape>(shape), std::forward<f>(fun)};
}

template <class Sender, class Shape, class f>
requires(::beman::execution::sender<Sender> && std::is_integral_v<Shape> &&
Expand Down
54 changes: 54 additions & 0 deletions tests/beman/execution/exec-bulk.test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,60 @@ auto test_bulk_noexept() {
ASSERT(counter == 10);
}

auto test_bulk_pipeable() {
auto b0 = test_std::just() | test_std::bulk(1, [](int) {});

static_assert(test_std::sender<decltype(b0)>);
auto b0_env = test_std::get_env(b0);
auto b0_completions = test_std::get_completion_signatures(b0, b0_env);
static_assert(
std::is_same_v<decltype(b0_completions),
beman::execution::completion_signatures<beman::execution::set_value_t(),
beman::execution::set_error_t(std::exception_ptr)> >,
"Completion signatures do not match!");

int counter = 0;

auto b1 = test_std::just() | test_std::bulk(5, [&](int i) { counter += i; });

static_assert(test_std::sender<decltype(b1)>);
auto b1_env = test_std::get_env(b0);
auto b1_completions = test_std::get_completion_signatures(b1, b1_env);
static_assert(
std::is_same_v<decltype(b1_completions),
beman::execution::completion_signatures<beman::execution::set_value_t(),
beman::execution::set_error_t(std::exception_ptr)> >,
"Completion signatures do not match!");
test_std::sync_wait(b1);
ASSERT(counter == 10);

std::vector<int> a{1, 2, 3, 4, 5, 6, 7, 8};
std::vector<int> b{9, 10, 11, 13, 14, 15, 16, 17};

std::vector<int> results(a.size(), 0);

auto b2 = test_std::just(a) | test_std::bulk(a.size(), [&](std::size_t index, const std::vector<int>& vec) {
results[index] = vec[index] * b[index];
});

static_assert(test_std::sender<decltype(b2)>);
auto b2_env = test_std::get_env(b2);
auto b2_completions = test_std::get_completion_signatures(b2, b2_env);
static_assert(
std::is_same_v<decltype(b2_completions),
beman::execution::completion_signatures<beman::execution::set_value_t(std::vector<int>),
beman::execution::set_error_t(std::exception_ptr)> >,
"Completion signatures do not match!");
test_std::sync_wait(b2);

// Expected results: element-wise multiplication of a and b
std::vector<int> expected{9, 20, 33, 52, 70, 90, 112, 136};

for (size_t i = 0; i < results.size(); ++i) {
ASSERT(results[i] == expected[i]);
}
}

} // namespace

TEST(exec_bulk) {
Expand Down