From c8745c3d1cf5f8d456d6f551a5a1037a0a639bbe Mon Sep 17 00:00:00 2001 From: Julius Park Date: Tue, 5 May 2026 15:53:16 -0400 Subject: [PATCH 1/5] initial commit --- pkg/scheduling/v1/tenant_manager.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/scheduling/v1/tenant_manager.go b/pkg/scheduling/v1/tenant_manager.go index 4ae55a1d33..d7a67cb23b 100644 --- a/pkg/scheduling/v1/tenant_manager.go +++ b/pkg/scheduling/v1/tenant_manager.go @@ -386,17 +386,19 @@ func (t *tenantManager) notifyNewConcurrencyStrategy(ctx context.Context, strate } func (t *tenantManager) queue(ctx context.Context, queueNames []string) { - queueNamesMap := make(map[string]struct{}, len(queueNames)) - - for _, name := range queueNames { - queueNamesMap[name] = struct{}{} - } + queueNamesMap := make(map[string]*Queuer, len(t.queuers)) t.queuersMu.RLock() - for _, q := range t.queuers { - if _, ok := queueNamesMap[q.queueName]; ok { + queueNamesMap[q.queueName] = q + } + for _, name := range queueNames { + if q, ok := queueNamesMap[name]; ok { + // queue already exists q.queue(ctx) + } else { + // new or inactive queue, create it + t.notifyNewQueue(ctx, name) } } From 3c8f03f78d636d73632cd278b1fb55b561f51f79 Mon Sep 17 00:00:00 2001 From: Julius Park Date: Tue, 5 May 2026 17:10:23 -0400 Subject: [PATCH 2/5] move lock up --- pkg/scheduling/v1/tenant_manager.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/scheduling/v1/tenant_manager.go b/pkg/scheduling/v1/tenant_manager.go index d7a67cb23b..9b80b29757 100644 --- a/pkg/scheduling/v1/tenant_manager.go +++ b/pkg/scheduling/v1/tenant_manager.go @@ -386,9 +386,8 @@ func (t *tenantManager) notifyNewConcurrencyStrategy(ctx context.Context, strate } func (t *tenantManager) queue(ctx context.Context, queueNames []string) { - queueNamesMap := make(map[string]*Queuer, len(t.queuers)) - t.queuersMu.RLock() + queueNamesMap := make(map[string]*Queuer, len(t.queuers)) for _, q := range t.queuers { queueNamesMap[q.queueName] = q } From 6885a2c467693148abeeb32096faba78d593de94 Mon Sep 17 00:00:00 2001 From: Julius Park Date: Tue, 5 May 2026 17:40:27 -0400 Subject: [PATCH 3/5] change the locks up again --- pkg/scheduling/v1/tenant_manager.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/pkg/scheduling/v1/tenant_manager.go b/pkg/scheduling/v1/tenant_manager.go index 9b80b29757..de9ba526b3 100644 --- a/pkg/scheduling/v1/tenant_manager.go +++ b/pkg/scheduling/v1/tenant_manager.go @@ -388,6 +388,7 @@ func (t *tenantManager) notifyNewConcurrencyStrategy(ctx context.Context, strate func (t *tenantManager) queue(ctx context.Context, queueNames []string) { t.queuersMu.RLock() queueNamesMap := make(map[string]*Queuer, len(t.queuers)) + inactiveQueues := make([]string, 0) for _, q := range t.queuers { queueNamesMap[q.queueName] = q } @@ -396,12 +397,15 @@ func (t *tenantManager) queue(ctx context.Context, queueNames []string) { // queue already exists q.queue(ctx) } else { - // new or inactive queue, create it - t.notifyNewQueue(ctx, name) + inactiveQueues = append(inactiveQueues, name) } } - t.queuersMu.RUnlock() + + // this function sends to a channel that acquires the queuersMu, so needs to be outside lock. + for _, name := range inactiveQueues { + t.notifyNewQueue(ctx, name) + } } type AssignedItemWithTask struct { From eecd2a06a35f2fb983a9d1fa1b6a665d100de677 Mon Sep 17 00:00:00 2001 From: Julius Park Date: Wed, 6 May 2026 09:34:35 -0400 Subject: [PATCH 4/5] try this to resolve deadlock --- pkg/scheduling/v1/queuer.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/pkg/scheduling/v1/queuer.go b/pkg/scheduling/v1/queuer.go index d1e81bb372..734b50d371 100644 --- a/pkg/scheduling/v1/queuer.go +++ b/pkg/scheduling/v1/queuer.go @@ -103,20 +103,15 @@ func (q *Queuer) Cleanup() { } func (q *Queuer) queue(ctx context.Context) { - if ok := q.queueMu.TryLock(); !ok { - return - } - - go func() { - defer q.queueMu.Unlock() - - ctx, span := telemetry.NewSpan(ctx, "notify-queue") - defer span.End() + ctx, span := telemetry.NewSpan(ctx, "notify-queue") + defer span.End() - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: q.tenantId.String()}) + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: q.tenantId.String()}) - q.notifyQueueCh <- telemetry.GetCarrier(ctx) - }() + select { + case q.notifyQueueCh <- telemetry.GetCarrier(ctx): + default: + } } func (q *Queuer) loopQueue(ctx context.Context) { From dea5761f9544624e4e2ed50d832c4237fc2b6fb6 Mon Sep 17 00:00:00 2001 From: Julius Park Date: Wed, 6 May 2026 10:30:03 -0400 Subject: [PATCH 5/5] this should actually fix deadlock --- pkg/scheduling/v1/queuer.go | 19 ++++++++++++------- pkg/scheduling/v1/tenant_manager.go | 19 +++++++++---------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/pkg/scheduling/v1/queuer.go b/pkg/scheduling/v1/queuer.go index 734b50d371..b895d939ea 100644 --- a/pkg/scheduling/v1/queuer.go +++ b/pkg/scheduling/v1/queuer.go @@ -103,15 +103,20 @@ func (q *Queuer) Cleanup() { } func (q *Queuer) queue(ctx context.Context) { - ctx, span := telemetry.NewSpan(ctx, "notify-queue") - defer span.End() + if ok := q.queueMu.TryLock(); !ok { + return + } - telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: q.tenantId.String()}) + go func() { + defer q.queueMu.Unlock() - select { - case q.notifyQueueCh <- telemetry.GetCarrier(ctx): - default: - } + telemetryCtx, span := telemetry.NewSpan(ctx, "notify-queue") + defer span.End() + + telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: q.tenantId.String()}) + + q.notifyQueueCh <- telemetry.GetCarrier(telemetryCtx) + }() } func (q *Queuer) loopQueue(ctx context.Context) { diff --git a/pkg/scheduling/v1/tenant_manager.go b/pkg/scheduling/v1/tenant_manager.go index de9ba526b3..160595d15c 100644 --- a/pkg/scheduling/v1/tenant_manager.go +++ b/pkg/scheduling/v1/tenant_manager.go @@ -387,23 +387,22 @@ func (t *tenantManager) notifyNewConcurrencyStrategy(ctx context.Context, strate func (t *tenantManager) queue(ctx context.Context, queueNames []string) { t.queuersMu.RLock() - queueNamesMap := make(map[string]*Queuer, len(t.queuers)) - inactiveQueues := make([]string, 0) - for _, q := range t.queuers { - queueNamesMap[q.queueName] = q - } + requested := make(map[string]struct{}, len(queueNames)) for _, name := range queueNames { - if q, ok := queueNamesMap[name]; ok { - // queue already exists + requested[name] = struct{}{} + } + // iterate t.queuers (not queueNames) to keep queueMu acquisition order consistent + // across goroutines, avoiding lock-ordering warnings from go-deadlock + for _, q := range t.queuers { + if _, ok := requested[q.queueName]; ok { q.queue(ctx) - } else { - inactiveQueues = append(inactiveQueues, name) + delete(requested, q.queueName) } } t.queuersMu.RUnlock() // this function sends to a channel that acquires the queuersMu, so needs to be outside lock. - for _, name := range inactiveQueues { + for name := range requested { t.notifyNewQueue(ctx, name) } }