Skip to content

Commit d37422f

Browse files
committed
mpmc: use heapless API for crossbeam impl
Ports the `crossbeam::ArrayQueue` impl for MPMC to using a `heapless`-compatible API.
1 parent e36e924 commit d37422f

File tree

4 files changed

+948
-286
lines changed

4 files changed

+948
-286
lines changed

Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,15 @@ zeroize = ["dep:zeroize"]
5353
# Enable larger MPMC sizes.
5454
mpmc_large = []
5555

56+
# Enable crossbeam ArrayQueue MPMC implementation.
57+
mpmc_crossbeam = ["dep:crossbeam-utils"]
58+
5659
# Implement some alloc Vec interoperability
5760
alloc = []
5861

5962
nightly = []
6063

64+
6165
[dependencies]
6266
bytes = { version = "1", default-features = false, optional = true }
6367
portable-atomic = { version = "1.0", optional = true }
@@ -67,6 +71,7 @@ ufmt = { version = "0.2", optional = true }
6771
ufmt-write = { version = "0.1", optional = true }
6872
defmt = { version = "1.0.1", optional = true }
6973
zeroize = { version = "1.8", optional = true, default-features = false, features = ["derive"] }
74+
crossbeam-utils = { version = "0.8", optional = true }
7075

7176
# for the pool module
7277
[target.'cfg(any(target_arch = "arm", target_pointer_width = "32", target_pointer_width = "64"))'.dependencies]

src/mpmc.rs

Lines changed: 10 additions & 286 deletions
Original file line numberDiff line numberDiff line change
@@ -78,19 +78,15 @@
7878
//!
7979
//! [bounded MPMC queue]: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
8080
81-
use core::{cell::UnsafeCell, mem::MaybeUninit};
82-
8381
#[cfg(loom)]
8482
use loom::sync::atomic;
8583

8684
#[cfg(not(any(feature = "portable-atomic", loom)))]
8785
use core::sync::atomic;
88-
#[cfg(feature = "portable-atomic")]
86+
#[cfg(all(feature = "portable-atomic", not(loom)))]
8987
use portable_atomic as atomic;
9088

91-
use atomic::Ordering;
92-
93-
use crate::storage::{OwnedStorage, Storage, ViewStorage};
89+
use crate::storage::ViewStorage;
9490

9591
#[cfg(feature = "mpmc_large")]
9692
type AtomicTargetSize = atomic::AtomicUsize;
@@ -107,293 +103,21 @@ type IntSize = isize;
107103
#[cfg(not(feature = "mpmc_large"))]
108104
type IntSize = i8;
109105

110-
/// Base struct for [`Queue`] and [`QueueView`], generic over the [`Storage`].
111-
///
112-
/// In most cases you should use [`Queue`] or [`QueueView`] directly. Only use this
113-
/// struct if you want to write code that's generic over both.
114-
pub struct QueueInner<T, S: Storage> {
115-
dequeue_pos: AtomicTargetSize,
116-
enqueue_pos: AtomicTargetSize,
117-
buffer: UnsafeCell<S::Buffer<Cell<T>>>,
118-
}
106+
#[cfg(not(feature = "mpmc_crossbeam"))]
107+
mod original;
108+
#[cfg(not(feature = "mpmc_crossbeam"))]
109+
pub use original::*;
119110

120-
/// A statically allocated multi-producer, multi-consumer queue with a capacity of `N` elements.
121-
///
122-
/// <div class="warning">
123-
///
124-
/// `N` must be a power of 2.
125-
///
126-
/// </div>
127-
///
128-
/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
129-
///
130-
/// <div class="warning">
131-
///
132-
/// This implementation is not fully lock-free. If a thread or task gets preempted during
133-
/// a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
134-
/// it's scheduled again to finish its operation.
135-
///
136-
/// See <https://github.com/rust-embedded/heapless/issues/583> for more details.
137-
///
138-
/// </div>
139-
pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
111+
#[cfg(feature = "mpmc_crossbeam")]
112+
mod crossbeam_array_queue;
113+
#[cfg(feature = "mpmc_crossbeam")]
114+
pub use crossbeam_array_queue::*;
140115

141116
/// A [`Queue`] with dynamic capacity.
142117
///
143118
/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference.
144119
pub type QueueView<T> = QueueInner<T, ViewStorage>;
145120

146-
impl<T, const N: usize> Queue<T, N> {
147-
#[cfg(not(loom))]
148-
/// Creates an empty queue.
149-
pub const fn new() -> Self {
150-
const {
151-
assert!(N > 1);
152-
assert!(N.is_power_of_two());
153-
assert!(N < UintSize::MAX as usize);
154-
}
155-
156-
let mut cell_count = 0;
157-
158-
let mut result_cells: [Cell<T>; N] = [const { Cell::new(0) }; N];
159-
while cell_count != N {
160-
result_cells[cell_count] = Cell::new(cell_count);
161-
cell_count += 1;
162-
}
163-
164-
Self {
165-
buffer: UnsafeCell::new(result_cells),
166-
dequeue_pos: AtomicTargetSize::new(0),
167-
enqueue_pos: AtomicTargetSize::new(0),
168-
}
169-
}
170-
171-
/// Creates an empty queue.
172-
#[cfg(loom)]
173-
pub fn new() -> Self {
174-
use core::array;
175-
176-
const {
177-
assert!(N > 1);
178-
assert!(N.is_power_of_two());
179-
assert!(N < UintSize::MAX as usize);
180-
}
181-
182-
let result_cells: [Cell<T>; N] = array::from_fn(|idx| Cell::new(idx));
183-
184-
Self {
185-
buffer: UnsafeCell::new(result_cells),
186-
dequeue_pos: AtomicTargetSize::new(0),
187-
enqueue_pos: AtomicTargetSize::new(0),
188-
}
189-
}
190-
191-
/// Used in `Storage` implementation.
192-
pub(crate) fn as_view_private(&self) -> &QueueView<T> {
193-
self
194-
}
195-
/// Used in `Storage` implementation.
196-
pub(crate) fn as_view_mut_private(&mut self) -> &mut QueueView<T> {
197-
self
198-
}
199-
}
200-
201-
impl<T, S: Storage> QueueInner<T, S> {
202-
/// Returns the maximum number of elements the queue can hold.
203-
#[inline]
204-
pub fn capacity(&self) -> usize {
205-
S::len(self.buffer.get())
206-
}
207-
208-
/// Get a reference to the `Queue`, erasing the `N` const-generic.
209-
///
210-
///
211-
/// ```rust
212-
/// # use heapless::mpmc::{Queue, QueueView};
213-
/// let queue: Queue<u8, 2> = Queue::new();
214-
/// let view: &QueueView<u8> = queue.as_view();
215-
/// ```
216-
///
217-
/// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements `Unsize<QueueView<T>>`:
218-
///
219-
/// ```rust
220-
/// # use heapless::mpmc::{Queue, QueueView};
221-
/// let queue: Queue<u8, 2> = Queue::new();
222-
/// let view: &QueueView<u8> = &queue;
223-
/// ```
224-
#[inline]
225-
pub fn as_view(&self) -> &QueueView<T> {
226-
S::as_mpmc_view(self)
227-
}
228-
229-
/// Get a mutable reference to the `Queue`, erasing the `N` const-generic.
230-
///
231-
/// ```rust
232-
/// # use heapless::mpmc::{Queue, QueueView};
233-
/// let mut queue: Queue<u8, 2> = Queue::new();
234-
/// let view: &mut QueueView<u8> = queue.as_mut_view();
235-
/// ```
236-
///
237-
/// It is often preferable to do the same through type coerction, since `Queue<T, N>` implements `Unsize<QueueView<T>>`:
238-
///
239-
/// ```rust
240-
/// # use heapless::mpmc::{Queue, QueueView};
241-
/// let mut queue: Queue<u8, 2> = Queue::new();
242-
/// let view: &mut QueueView<u8> = &mut queue;
243-
/// ```
244-
#[inline]
245-
pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
246-
S::as_mpmc_mut_view(self)
247-
}
248-
249-
fn mask(&self) -> UintSize {
250-
(S::len(self.buffer.get()) - 1) as _
251-
}
252-
253-
/// Returns the item in the front of the queue, or `None` if the queue is empty.
254-
pub fn dequeue(&self) -> Option<T> {
255-
unsafe { dequeue(S::as_ptr(self.buffer.get()), &self.dequeue_pos, self.mask()) }
256-
}
257-
258-
/// Adds an `item` to the end of the queue.
259-
///
260-
/// Returns back the `item` if the queue is full.
261-
pub fn enqueue(&self, item: T) -> Result<(), T> {
262-
unsafe {
263-
enqueue(
264-
S::as_ptr(self.buffer.get()),
265-
&self.enqueue_pos,
266-
self.mask(),
267-
item,
268-
)
269-
}
270-
}
271-
}
272-
273-
impl<T, const N: usize> Default for Queue<T, N> {
274-
fn default() -> Self {
275-
Self::new()
276-
}
277-
}
278-
279-
impl<T, S: Storage> Drop for QueueInner<T, S> {
280-
fn drop(&mut self) {
281-
// Drop all elements currently in the queue.
282-
while self.dequeue().is_some() {}
283-
}
284-
}
285-
286-
unsafe impl<T, S: Storage> Sync for QueueInner<T, S> where T: Send {}
287-
288-
struct Cell<T> {
289-
data: MaybeUninit<T>,
290-
sequence: AtomicTargetSize,
291-
}
292-
293-
impl<T> Cell<T> {
294-
#[cfg(not(loom))]
295-
const fn new(seq: usize) -> Self {
296-
Self {
297-
data: MaybeUninit::uninit(),
298-
sequence: AtomicTargetSize::new(seq as UintSize),
299-
}
300-
}
301-
#[cfg(loom)]
302-
fn new(seq: usize) -> Self {
303-
Self {
304-
data: MaybeUninit::uninit(),
305-
sequence: AtomicTargetSize::new(seq as UintSize),
306-
}
307-
}
308-
}
309-
310-
unsafe fn dequeue<T>(
311-
buffer: *mut Cell<T>,
312-
dequeue_pos: &AtomicTargetSize,
313-
mask: UintSize,
314-
) -> Option<T> {
315-
let mut pos = dequeue_pos.load(Ordering::Relaxed);
316-
317-
let mut cell;
318-
loop {
319-
cell = buffer.add(usize::from(pos & mask));
320-
let seq = (*cell).sequence.load(Ordering::Acquire);
321-
let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize);
322-
323-
match dif.cmp(&0) {
324-
core::cmp::Ordering::Equal => {
325-
if dequeue_pos
326-
.compare_exchange_weak(
327-
pos,
328-
pos.wrapping_add(1),
329-
Ordering::Relaxed,
330-
Ordering::Relaxed,
331-
)
332-
.is_ok()
333-
{
334-
break;
335-
}
336-
}
337-
core::cmp::Ordering::Less => {
338-
return None;
339-
}
340-
core::cmp::Ordering::Greater => {
341-
pos = dequeue_pos.load(Ordering::Relaxed);
342-
}
343-
}
344-
}
345-
346-
let data = (*cell).data.as_ptr().read();
347-
(*cell)
348-
.sequence
349-
.store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
350-
Some(data)
351-
}
352-
353-
unsafe fn enqueue<T>(
354-
buffer: *mut Cell<T>,
355-
enqueue_pos: &AtomicTargetSize,
356-
mask: UintSize,
357-
item: T,
358-
) -> Result<(), T> {
359-
let mut pos = enqueue_pos.load(Ordering::Relaxed);
360-
361-
let mut cell;
362-
loop {
363-
cell = buffer.add(usize::from(pos & mask));
364-
let seq = (*cell).sequence.load(Ordering::Acquire);
365-
let dif = (seq as IntSize).wrapping_sub(pos as IntSize);
366-
367-
match dif.cmp(&0) {
368-
core::cmp::Ordering::Equal => {
369-
if enqueue_pos
370-
.compare_exchange_weak(
371-
pos,
372-
pos.wrapping_add(1),
373-
Ordering::Relaxed,
374-
Ordering::Relaxed,
375-
)
376-
.is_ok()
377-
{
378-
break;
379-
}
380-
}
381-
core::cmp::Ordering::Less => {
382-
return Err(item);
383-
}
384-
core::cmp::Ordering::Greater => {
385-
pos = enqueue_pos.load(Ordering::Relaxed);
386-
}
387-
}
388-
}
389-
390-
(*cell).data.as_mut_ptr().write(item);
391-
(*cell)
392-
.sequence
393-
.store(pos.wrapping_add(1), Ordering::Release);
394-
Ok(())
395-
}
396-
397121
#[cfg(not(loom))]
398122
#[cfg(test)]
399123
mod tests {

0 commit comments

Comments
 (0)