@@ -99,20 +99,27 @@ type UintSize = usize;
9999type  UintSize  = u8 ; 
100100
101101#[ cfg( feature = "mpmc_large" ) ]  
102+ #[ allow( unused) ]  
102103type  IntSize  = isize ; 
103104#[ cfg( not( feature = "mpmc_large" ) ) ]  
105+ #[ allow( unused) ]  
104106type  IntSize  = i8 ; 
105107
106- #[ cfg( not( feature = "mpmc_crossbeam" ) ) ]  
108+ #[ cfg( all ( not( feature = "mpmc_nblfq"  ) ,  not ( feature =  " mpmc_crossbeam") ) ) ]  
107109mod  original; 
108- #[ cfg( not( feature = "mpmc_crossbeam" ) ) ]  
110+ #[ cfg( all ( not( feature = "mpmc_nblfq"  ) ,  not ( feature =  " mpmc_crossbeam") ) ) ]  
109111pub  use  original:: * ; 
110112
111113#[ cfg( feature = "mpmc_crossbeam" ) ]  
112114mod  crossbeam_array_queue; 
113115#[ cfg( feature = "mpmc_crossbeam" ) ]  
114116pub  use  crossbeam_array_queue:: * ; 
115117
118+ #[ cfg( all( feature = "mpmc_nblfq" ,  not( feature = "mpmc_crossbeam" ) ) ) ]  
119+ mod  nblfq; 
120+ #[ cfg( all( feature = "mpmc_nblfq" ,  not( feature = "mpmc_crossbeam" ) ) ) ]  
121+ pub  use  nblfq:: * ; 
122+ 
116123/// A [`Queue`] with dynamic capacity. 
117124/// 
118125/// [`Queue`] coerces to `QueueView`. `QueueView` is `!Sized`, meaning it can only ever be used by reference. 
@@ -125,6 +132,8 @@ mod tests {
125132
126133    use  super :: Queue ; 
127134
135+     const  N :  usize  = 4 ; 
136+ 
128137    // Ensure a `Queue` containing `!Send` values stays `!Send` itself. 
129138    assert_not_impl_any ! ( Queue <* const  ( ) ,  4 >:  Send ) ; 
130139
@@ -196,7 +205,153 @@ mod tests {
196205        // Queue is full, this should not block forever. 
197206        q. enqueue ( 0x55 ) . unwrap_err ( ) ; 
198207    } 
208+ 
209+     #[ test]  
210+     fn  test_enqueue_contention_rt ( )  { 
211+         use  thread_priority:: * ; 
212+ 
213+         let  q0 = std:: sync:: Arc :: new ( Queue :: < u8 ,  N > :: new ( ) ) ; 
214+ 
215+         for  i in  0 ..N  { 
216+             q0. enqueue ( i as  u8 ) . expect ( "new enqueue" ) ; 
217+         } 
218+ 
219+         let  model_thread = |q0 :  std:: sync:: Arc < Queue < u8 ,  N > > | { 
220+             for  k in  0 ..N  { 
221+                 match  q0. dequeue ( )  { 
222+                     Some ( _i)  => ( ) , 
223+                     None  if  q0. is_empty ( )  => ( ) , 
224+                     None  => panic ! ( 
225+                         "enqueue: Dequeue failed on iteration: {k}, empty queue?: {}, queue len: {}" , 
226+                         q0. is_empty( ) , 
227+                         q0. len( ) 
228+                     ) , 
229+                 } ; 
230+ 
231+                 q0. enqueue ( k as  u8 ) . unwrap ( ) ; 
232+             } 
233+         } ; 
234+ 
235+         let  q1 = q0. clone ( ) ; 
236+         let  h1 = ThreadBuilder :: default ( ) 
237+             . name ( "h1" ) 
238+             . priority ( ThreadPriority :: Max ) 
239+             . policy ( ThreadSchedulePolicy :: Realtime ( 
240+                 RealtimeThreadSchedulePolicy :: Fifo , 
241+             ) ) 
242+             . spawn ( move  |_| model_thread ( q1) ) 
243+             . unwrap ( ) ; 
244+ 
245+         let  q2 = q0. clone ( ) ; 
246+         let  h2 = ThreadBuilder :: default ( ) 
247+             . name ( "h2" ) 
248+             . priority ( ThreadPriority :: Max ) 
249+             . policy ( ThreadSchedulePolicy :: Realtime ( 
250+                 RealtimeThreadSchedulePolicy :: Fifo , 
251+             ) ) 
252+             . spawn ( move  |_| model_thread ( q2) ) 
253+             . unwrap ( ) ; 
254+ 
255+         h1. join ( ) . unwrap ( ) ; 
256+         h2. join ( ) . unwrap ( ) ; 
257+     } 
258+ 
259+     #[ test]  
260+     fn  test_dequeue_contention_rt ( )  { 
261+         use  thread_priority:: * ; 
262+ 
263+         let  q0 = std:: sync:: Arc :: new ( Queue :: < u8 ,  N > :: new ( ) ) ; 
264+ 
265+         let  model_thread = |q0 :  std:: sync:: Arc < Queue < u8 ,  N > > | { 
266+             for  k in  0 ..N  { 
267+                 q0. enqueue ( k as  u8 ) . unwrap ( ) ; 
268+                 match  q0. dequeue ( )  { 
269+                     Some ( _i)  => ( ) , 
270+                     None  if  q0. is_empty ( )  => ( ) , 
271+                     None  => { 
272+                         panic ! ( 
273+                             "dequeue: Dequeue failed on iteration: {k}, queue is empty?: {}, queue len: {}" , 
274+                             q0. is_empty( ) , 
275+                             q0. len( ) 
276+                         ) ; 
277+                     } 
278+                 } 
279+             } 
280+         } ; 
281+ 
282+         let  q1 = q0. clone ( ) ; 
283+         let  h1 = ThreadBuilder :: default ( ) 
284+             . name ( "h1" ) 
285+             . priority ( ThreadPriority :: Max ) 
286+             . policy ( ThreadSchedulePolicy :: Realtime ( 
287+                 RealtimeThreadSchedulePolicy :: Fifo , 
288+             ) ) 
289+             . spawn ( move  |_| model_thread ( q1) ) 
290+             . unwrap ( ) ; 
291+ 
292+         let  q2 = q0. clone ( ) ; 
293+         let  h2 = ThreadBuilder :: default ( ) 
294+             . name ( "h2" ) 
295+             . priority ( ThreadPriority :: Max ) 
296+             . policy ( ThreadSchedulePolicy :: Realtime ( 
297+                 RealtimeThreadSchedulePolicy :: Fifo , 
298+             ) ) 
299+             . spawn ( move  |_| model_thread ( q2) ) 
300+             . unwrap ( ) ; 
301+ 
302+         h1. join ( ) . unwrap ( ) ; 
303+         h2. join ( ) . unwrap ( ) ; 
304+     } 
305+ 
306+     #[ test]  
307+     fn  test_issue_583_enqueue_rt ( )  { 
308+         use  thread_priority:: * ; 
309+ 
310+         fn  to_vec < T > ( q :  & Queue < T ,  N > )  -> Vec < T >  { 
311+             // inaccurate 
312+             let  mut  ret = vec ! [ ] ; 
313+             while  let  Some ( v)  = q. dequeue ( )  { 
314+                 ret. push ( v) ; 
315+             } 
316+             ret
317+         } 
318+ 
319+         let  q0 = std:: sync:: Arc :: new ( Queue :: < u8 ,  N > :: new ( ) ) ; 
320+ 
321+         let  model_thread = move  |q0 :  std:: sync:: Arc < Queue < u8 ,  N > > | { 
322+             for  k in  0 ..1_000_000  { 
323+                 if  let  Some ( v)  = q0. dequeue ( )  { 
324+                     q0. enqueue ( v) 
325+                         . unwrap_or_else ( |v| panic ! ( "{}: q0 -> q0: {}, {:?}" ,  k,  v,  to_vec( & q0) ) ) ; 
326+                 } 
327+             } 
328+         } ; 
329+ 
330+         let  q1 = q0. clone ( ) ; 
331+         let  h1 = ThreadBuilder :: default ( ) 
332+             . name ( "h1" ) 
333+             . priority ( ThreadPriority :: Max ) 
334+             . policy ( ThreadSchedulePolicy :: Realtime ( 
335+                 RealtimeThreadSchedulePolicy :: Fifo , 
336+             ) ) 
337+             . spawn ( move  |_| model_thread ( q1) ) 
338+             . unwrap ( ) ; 
339+ 
340+         let  q2 = q0. clone ( ) ; 
341+         let  h2 = ThreadBuilder :: default ( ) 
342+             . name ( "h2" ) 
343+             . priority ( ThreadPriority :: Max ) 
344+             . policy ( ThreadSchedulePolicy :: Realtime ( 
345+                 RealtimeThreadSchedulePolicy :: Fifo , 
346+             ) ) 
347+             . spawn ( move  |_| model_thread ( q2) ) 
348+             . unwrap ( ) ; 
349+ 
350+         h1. join ( ) . unwrap ( ) ; 
351+         h2. join ( ) . unwrap ( ) ; 
352+     } 
199353} 
354+ 
200355#[ cfg( all( loom,  test) ) ]  
201356mod  tests_loom { 
202357    use  super :: * ; 
0 commit comments