Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ If the size provided is not a power if 2, it's rounded up to the next power of 2

```cpp
// SPSC
fastchan::SPSC<int, blockingType, chan_size> c;
fastchan::SPSC<int, chan_size> c;
// OR
fastchan::SPSC<int, blockingType, chan_size, fastchan::WaitPause> c;
fastchan::SPSC<int, chan_size, fastchan::PauseWaitStrategy, fastchan::PauseWaitStrategy> c;

c.put(0);
c.put(1);
Expand Down
76 changes: 50 additions & 26 deletions include/spsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,66 +21,90 @@ class SPSC {
SPSC() = default;

put_t put(const T &value) noexcept {
while (next_free_index_2_ > (reader_index_.load(std::memory_order_acquire) + index_mask_)) {
while (producer_.next_free_index_2_ > (producer_.reader_index_cache_ + common_.index_mask_)) {
producer_.reader_index_cache_ = consumer_.reader_index_.load(std::memory_order_acquire);
if constexpr (std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value) {
return false;
} else {
put_wait_.wait([this] { return next_free_index_2_ <= (reader_index_.load(std::memory_order_acquire) + index_mask_); });
common_.put_wait_.wait(
[this] { return producer_.next_free_index_2_ <= (consumer_.reader_index_.load(std::memory_order_acquire) + common_.index_mask_); });
}
}

contents_[next_free_index_2_ & index_mask_] = value;
next_free_index_.store(++next_free_index_2_, std::memory_order_release);
contents_[producer_.next_free_index_2_ & common_.index_mask_] = value;
producer_.next_free_index_.store(++producer_.next_free_index_2_, std::memory_order_release);

get_wait_.notify();
common_.get_wait_.notify();

if constexpr (std::is_same<PutWaitStrategy, ReturnImmediateStrategy>::value) {
return true;
}
}

get_t get() noexcept {
while (reader_index_2_ >= next_free_index_.load(std::memory_order_acquire)) {
while (consumer_.reader_index_2_ >= consumer_.next_free_index_cache_) {
consumer_.next_free_index_cache_ = producer_.next_free_index_.load(std::memory_order_acquire);
if constexpr (std::is_same<GetWaitStrategy, ReturnImmediateStrategy>::value) {
return std::nullopt;
} else {
get_wait_.wait([this] { return reader_index_2_ < next_free_index_.load(std::memory_order_acquire); });
common_.get_wait_.wait([this] { return consumer_.reader_index_2_ < producer_.next_free_index_.load(std::memory_order_acquire); });
}
}

auto contents = contents_[reader_index_2_ & index_mask_];
reader_index_.store(++reader_index_2_, std::memory_order_release);
auto contents = contents_[consumer_.reader_index_2_ & common_.index_mask_];
consumer_.reader_index_.store(++consumer_.reader_index_2_, std::memory_order_release);

put_wait_.notify();
common_.put_wait_.notify();

return contents;
}

// For testing only
void empty() noexcept {
reader_index_2_ = 0;
next_free_index_2_ = 0;
next_free_index_.store(0, std::memory_order_release);
reader_index_.store(0, std::memory_order_release);
consumer_.next_free_index_cache_ = 0;
producer_.reader_index_cache_ = 0;
consumer_.reader_index_2_ = 0;
producer_.next_free_index_2_ = 0;
producer_.next_free_index_.store(0, std::memory_order_release);
consumer_.reader_index_.store(0, std::memory_order_release);
}

std::size_t size() const noexcept { return next_free_index_.load(std::memory_order_acquire) - reader_index_.load(std::memory_order_acquire); }
std::size_t size() const noexcept {
return producer_.next_free_index_.load(std::memory_order_acquire) - consumer_.reader_index_.load(std::memory_order_acquire);
}

bool isEmpty() const noexcept { return reader_index_.load(std::memory_order_acquire) >= next_free_index_.load(std::memory_order_acquire); }
bool isEmpty() const noexcept {
return consumer_.reader_index_.load(std::memory_order_acquire) >= producer_.next_free_index_.load(std::memory_order_acquire);
}

bool isFull() const noexcept { return next_free_index_.load(std::memory_order_relaxed) > (reader_index_.load(std::memory_order_acquire) + index_mask_); }
bool isFull() const noexcept {
return producer_.next_free_index_.load(std::memory_order_relaxed) > (consumer_.reader_index_.load(std::memory_order_acquire) + common_.index_mask_);
}

private:
std::array<T, roundUpNextPowerOfTwo(min_size)> contents_;

alignas(64) GetWaitStrategy get_wait_{};
alignas(64) PutWaitStrategy put_wait_{};
alignas(64) const std::size_t index_mask_ = roundUpNextPowerOfTwo(min_size) - 1;

alignas(64) std::size_t next_free_index_2_{0};
alignas(64) std::atomic<std::size_t> next_free_index_{0};

alignas(64) std::size_t reader_index_2_{0};
alignas(64) std::atomic<std::size_t> reader_index_{0};
struct alignas(64) Common {
GetWaitStrategy get_wait_{};
PutWaitStrategy put_wait_{};
const std::size_t index_mask_ = roundUpNextPowerOfTwo(min_size) - 1;
};

struct alignas(64) Producer {
std::size_t reader_index_cache_{0};
std::size_t next_free_index_2_{0};
std::atomic<std::size_t> next_free_index_{0};
};

struct alignas(64) Consumer {
std::size_t next_free_index_cache_{0};
std::size_t reader_index_2_{0};
std::atomic<std::size_t> reader_index_{0};
};

Common common_;
Producer producer_;
Consumer consumer_;
};
} // namespace fastchan