11package  priorityqueue
22
33import  (
4+ 	"math" 
45	"sync" 
56	"sync/atomic" 
67	"time" 
@@ -206,17 +207,18 @@ func (w *priorityqueue[T]) spin() {
206207	blockForever  :=  make (chan  time.Time )
207208	var  nextReady  <- chan  time.Time 
208209	nextReady  =  blockForever 
210+ 	var  nextItemReadyAt  time.Time 
209211
210212	for  {
211213		select  {
212214		case  <- w .done :
213215			return 
214216		case  <- w .itemOrWaiterAdded :
215217		case  <- nextReady :
218+ 			nextReady  =  blockForever 
219+ 			nextItemReadyAt  =  time.Time {}
216220		}
217221
218- 		nextReady  =  blockForever 
219- 
220222		func () {
221223			w .lock .Lock ()
222224			defer  w .lock .Unlock ()
@@ -227,39 +229,67 @@ func (w *priorityqueue[T]) spin() {
227229			// manipulating the tree from within Ascend might lead to panics, so 
228230			// track what we want to delete and do it after we are done ascending. 
229231			var  toDelete  []* item [T ]
230- 			w .queue .Ascend (func (item  * item [T ]) bool  {
231- 				if  item .ReadyAt  !=  nil  {
232- 					if  readyAt  :=  item .ReadyAt .Sub (w .now ()); readyAt  >  0  {
233- 						nextReady  =  w .tick (readyAt )
234- 						return  false 
232+ 
233+ 			var  key  T 
234+ 
235+ 			// Items in the queue tree are sorted first by priority and second by readiness, so 
236+ 			// items with a lower priority might be ready further down in the queue. 
237+ 			// We iterate through the priorities high to low until we find a ready item 
238+ 			pivot  :=  item [T ]{
239+ 				Key :          key ,
240+ 				AddedCounter : 0 ,
241+ 				Priority :     math .MaxInt ,
242+ 				ReadyAt :      nil ,
243+ 			}
244+ 
245+ 			for  {
246+ 				pivotChange  :=  false 
247+ 
248+ 				w .queue .AscendGreaterOrEqual (& pivot , func (item  * item [T ]) bool  {
249+ 					// Item is locked, we can not hand it out 
250+ 					if  w .locked .Has (item .Key ) {
251+ 						return  true 
235252					}
236- 					if  ! w .becameReady .Has (item .Key ) {
237- 						w .metrics .add (item .Key , item .Priority )
238- 						w .becameReady .Insert (item .Key )
253+ 
254+ 					if  item .ReadyAt  !=  nil  {
255+ 						if  readyAt  :=  item .ReadyAt .Sub (w .now ()); readyAt  >  0  {
256+ 							if  nextItemReadyAt .After (* item .ReadyAt ) ||  nextItemReadyAt .IsZero () {
257+ 								nextReady  =  w .tick (readyAt )
258+ 								nextItemReadyAt  =  * item .ReadyAt 
259+ 							}
260+ 
261+ 							// Adjusting the pivot item moves the ascend to the next lower priority 
262+ 							pivot .Priority  =  item .Priority  -  1 
263+ 							pivotChange  =  true 
264+ 							return  false 
265+ 						}
266+ 						if  ! w .becameReady .Has (item .Key ) {
267+ 							w .metrics .add (item .Key , item .Priority )
268+ 							w .becameReady .Insert (item .Key )
269+ 						}
239270					}
240- 				}
241271
242- 				if  w .waiters .Load () ==  0  {
243- 					// Have to keep iterating here to ensure we update metrics 
244- 					// for further items that became ready and set nextReady. 
245- 					return  true 
246- 				}
272+ 					 if  w .waiters .Load () ==  0  {
273+ 						 // Have to keep iterating here to ensure we update metrics 
274+ 						 // for further items that became ready and set nextReady. 
275+ 						 return  true 
276+ 					 }
247277
248- 				// Item is locked, we can not hand it out 
249- 				if  w .locked .Has (item .Key ) {
250- 					return  true 
251- 				}
278+ 					w .metrics .get (item .Key , item .Priority )
279+ 					w .locked .Insert (item .Key )
280+ 					w .waiters .Add (- 1 )
281+ 					delete (w .items , item .Key )
282+ 					toDelete  =  append (toDelete , item )
283+ 					w .becameReady .Delete (item .Key )
284+ 					w .get  <-  * item 
252285
253- 				w .metrics .get (item .Key , item .Priority )
254- 				w .locked .Insert (item .Key )
255- 				w .waiters .Add (- 1 )
256- 				delete (w .items , item .Key )
257- 				toDelete  =  append (toDelete , item )
258- 				w .becameReady .Delete (item .Key )
259- 				w .get  <-  * item 
286+ 					return  true 
287+ 				})
260288
261- 				return  true 
262- 			})
289+ 				if  ! pivotChange  {
290+ 					break 
291+ 				}
292+ 			}
263293
264294			for  _ , item  :=  range  toDelete  {
265295				w .queue .Delete (item )
@@ -387,6 +417,9 @@ func (w *priorityqueue[T]) logState() {
387417}
388418
389419func  less [T  comparable ](a , b  * item [T ]) bool  {
420+ 	if  a .Priority  !=  b .Priority  {
421+ 		return  a .Priority  >  b .Priority 
422+ 	}
390423	if  a .ReadyAt  ==  nil  &&  b .ReadyAt  !=  nil  {
391424		return  true 
392425	}
@@ -396,9 +429,6 @@ func less[T comparable](a, b *item[T]) bool {
396429	if  a .ReadyAt  !=  nil  &&  b .ReadyAt  !=  nil  &&  ! a .ReadyAt .Equal (* b .ReadyAt ) {
397430		return  a .ReadyAt .Before (* b .ReadyAt )
398431	}
399- 	if  a .Priority  !=  b .Priority  {
400- 		return  a .Priority  >  b .Priority 
401- 	}
402432
403433	return  a .AddedCounter  <  b .AddedCounter 
404434}
@@ -426,4 +456,5 @@ type bTree[T any] interface {
426456	ReplaceOrInsert (item  T ) (_  T , _  bool )
427457	Delete (item  T ) (T , bool )
428458	Ascend (iterator  btree.ItemIteratorG [T ])
459+ 	AscendGreaterOrEqual (pivot  T , iterator  btree.ItemIteratorG [T ])
429460}
0 commit comments