6161//! - The numbers reported correspond to the successful path, i.e. `dequeue` returning `Some`
6262//! and `enqueue` returning `Ok`.
6363//!
64+ //!
65+ //! <div class="warning">
66+ //!
67+ //! This implementation is not fully lock-free. If a thread or task gets preempted during
68+ //! a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
69+ //! it's scheduled again to finish its operation.
70+ //!
71+ //! See <https://github.com/rust-embedded/heapless/issues/583> for more details.
72+ //!
73+ //! </div>
6474//! # References
6575//!
6676//! This is an implementation of Dmitry Vyukov's [bounded MPMC queue], minus the
7080
7181use core:: { cell:: UnsafeCell , mem:: MaybeUninit } ;
7282
73- #[ cfg( not( feature = "portable-atomic" ) ) ]
83+ #[ cfg( loom) ]
84+ use loom:: sync:: atomic;
85+
86+ #[ cfg( all( loom, feature = "portable-atomic" ) ) ]
87+ compile_error ! ( "Loom can only be used in tests without portable-atomic" ) ;
88+
89+ #[ cfg( not( any( feature = "portable-atomic" , loom) ) ) ]
7490use core:: sync:: atomic;
7591#[ cfg( feature = "portable-atomic" ) ]
7692use portable_atomic as atomic;
@@ -113,6 +129,16 @@ pub struct QueueInner<T, S: Storage> {
113129/// </div>
114130///
115131/// The maximum value of `N` is 128 if the `mpmc_large` feature is not enabled.
132+ ///
133+ /// <div class="warning">
134+ ///
135+ /// This implementation is not fully lock-free. If a thread or task gets preempted during
136+ /// a `dequeue` or `enqueue` operation, it may prevent other operations from succeeding until
137+ /// it's scheduled again to finish its operation.
138+ ///
139+ /// See <https://github.com/rust-embedded/heapless/issues/583> for more details.
140+ ///
141+ /// </div>
116142pub type Queue < T , const N : usize > = QueueInner < T , OwnedStorage < N > > ;
117143
118144/// A [`Queue`] with dynamic capacity.
@@ -121,6 +147,7 @@ pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
121147pub type QueueView < T > = QueueInner < T , ViewStorage > ;
122148
123149impl < T , const N : usize > Queue < T , N > {
150+ #[ cfg( not( loom) ) ]
124151 /// Creates an empty queue.
125152 pub const fn new ( ) -> Self {
126153 const {
@@ -144,6 +171,26 @@ impl<T, const N: usize> Queue<T, N> {
144171 }
145172 }
146173
174+ /// Creates an empty queue.
175+ #[ cfg( loom) ]
176+ pub fn new ( ) -> Self {
177+ use core:: array;
178+
179+ const {
180+ assert ! ( N > 1 ) ;
181+ assert ! ( N . is_power_of_two( ) ) ;
182+ assert ! ( N < UintSize :: MAX as usize ) ;
183+ }
184+
185+ let result_cells: [ Cell < T > ; N ] = array:: from_fn ( |idx| Cell :: new ( idx) ) ;
186+
187+ Self {
188+ buffer : UnsafeCell :: new ( result_cells) ,
189+ dequeue_pos : AtomicTargetSize :: new ( 0 ) ,
190+ enqueue_pos : AtomicTargetSize :: new ( 0 ) ,
191+ }
192+ }
193+
147194 /// Used in `Storage` implementation.
148195 pub ( crate ) fn as_view_private ( & self ) -> & QueueView < T > {
149196 self
@@ -247,12 +294,20 @@ struct Cell<T> {
247294}
248295
249296impl < T > Cell < T > {
297+ #[ cfg( not( loom) ) ]
250298 const fn new ( seq : usize ) -> Self {
251299 Self {
252300 data : MaybeUninit :: uninit ( ) ,
253301 sequence : AtomicTargetSize :: new ( seq as UintSize ) ,
254302 }
255303 }
304+ #[ cfg( loom) ]
305+ fn new ( seq : usize ) -> Self {
306+ Self {
307+ data : MaybeUninit :: uninit ( ) ,
308+ sequence : AtomicTargetSize :: new ( seq as UintSize ) ,
309+ }
310+ }
256311}
257312
258313unsafe fn dequeue < T > (
@@ -286,6 +341,8 @@ unsafe fn dequeue<T>(
286341 return None ;
287342 }
288343 core:: cmp:: Ordering :: Greater => {
344+ #[ cfg( loom) ]
345+ loom:: hint:: spin_loop ( ) ;
289346 pos = dequeue_pos. load ( Ordering :: Relaxed ) ;
290347 }
291348 }
@@ -330,6 +387,8 @@ unsafe fn enqueue<T>(
330387 return Err ( item) ;
331388 }
332389 core:: cmp:: Ordering :: Greater => {
390+ #[ cfg( loom) ]
391+ loom:: hint:: spin_loop ( ) ;
333392 pos = enqueue_pos. load ( Ordering :: Relaxed ) ;
334393 }
335394 }
@@ -342,6 +401,7 @@ unsafe fn enqueue<T>(
342401 Ok ( ( ) )
343402}
344403
404+ #[ cfg( not( loom) ) ]
345405#[ cfg( test) ]
346406mod tests {
347407 use static_assertions:: assert_not_impl_any;
@@ -420,3 +480,70 @@ mod tests {
420480 q. enqueue ( 0x55 ) . unwrap_err ( ) ;
421481 }
422482}
483+ #[ cfg( all( loom, test) ) ]
484+ mod tests_loom {
485+ use super :: * ;
486+ use std:: sync:: Arc ;
487+ const N : usize = 4 ;
488+
489+ // FIXME: This test should pass. See https://github.com/rust-embedded/heapless/issues/583
490+ #[ test]
491+ #[ cfg( loom) ]
492+ #[ should_panic( expected = "Test failed" ) ]
493+ fn loom_issue_583_enqueue ( ) {
494+ loom:: model ( || {
495+ let q0 = Arc :: new ( Queue :: < u8 , N > :: new ( ) ) ;
496+ for i in 0 ..N {
497+ q0. enqueue ( i as u8 ) . unwrap ( ) ;
498+ }
499+ let model_thread = || {
500+ let q0 = q0. clone ( ) ;
501+ move || {
502+ for k in 0 ..N + 2 {
503+ let Some ( i) = q0. dequeue ( ) else {
504+ return ;
505+ panic ! ( "Test failed at dequeue" ) ;
506+ } ;
507+ if q0. enqueue ( k as u8 ) . is_err ( ) {
508+ panic ! ( "Test failed at enqueue: {i}" ) ;
509+ }
510+ }
511+ }
512+ } ;
513+
514+ let h1 = loom:: thread:: spawn ( model_thread ( ) ) ;
515+ let h2 = loom:: thread:: spawn ( model_thread ( ) ) ;
516+ h1. join ( ) . unwrap ( ) ;
517+ h2. join ( ) . unwrap ( ) ;
518+ } ) ;
519+ }
520+
521+ // FIXME: This test should pass. See https://github.com/rust-embedded/heapless/issues/583
522+ #[ test]
523+ #[ cfg( loom) ]
524+ #[ should_panic( expected = "Test failed" ) ]
525+ fn loom_issue_583_dequeue ( ) {
526+ loom:: model ( || {
527+ let q0 = Arc :: new ( Queue :: < u8 , N > :: new ( ) ) ;
528+ let model_thread = || {
529+ let q0 = q0. clone ( ) ;
530+ move || {
531+ for k in 0 ..N + 2 {
532+ if q0. enqueue ( k as u8 ) . is_err ( ) {
533+ panic ! ( "Test failed at enqueue: {k}" ) ;
534+ }
535+ if q0. dequeue ( ) . is_none ( ) {
536+ return ;
537+ panic ! ( "Test failed at dequeue: {k}" ) ;
538+ }
539+ }
540+ }
541+ } ;
542+
543+ let h1 = loom:: thread:: spawn ( model_thread ( ) ) ;
544+ let h2 = loom:: thread:: spawn ( model_thread ( ) ) ;
545+ h1. join ( ) . unwrap ( ) ;
546+ h2. join ( ) . unwrap ( ) ;
547+ } ) ;
548+ }
549+ }
0 commit comments