diff --git a/pkg/scheduling/v1/queuer.go b/pkg/scheduling/v1/queuer.go index d1e81bb372..b895d939ea 100644 --- a/pkg/scheduling/v1/queuer.go +++ b/pkg/scheduling/v1/queuer.go @@ -110,12 +110,12 @@ func (q *Queuer) queue(ctx context.Context) { go func() { defer q.queueMu.Unlock() - ctx, span := telemetry.NewSpan(ctx, "notify-queue") + telemetryCtx, span := telemetry.NewSpan(ctx, "notify-queue") defer span.End() telemetry.WithAttributes(span, telemetry.AttributeKV{Key: "tenant.id", Value: q.tenantId.String()}) - q.notifyQueueCh <- telemetry.GetCarrier(ctx) + q.notifyQueueCh <- telemetry.GetCarrier(telemetryCtx) }() } diff --git a/pkg/scheduling/v1/tenant_manager.go b/pkg/scheduling/v1/tenant_manager.go index 4ae55a1d33..160595d15c 100644 --- a/pkg/scheduling/v1/tenant_manager.go +++ b/pkg/scheduling/v1/tenant_manager.go @@ -386,21 +386,25 @@ func (t *tenantManager) notifyNewConcurrencyStrategy(ctx context.Context, strate } func (t *tenantManager) queue(ctx context.Context, queueNames []string) { - queueNamesMap := make(map[string]struct{}, len(queueNames)) - + t.queuersMu.RLock() + requested := make(map[string]struct{}, len(queueNames)) for _, name := range queueNames { - queueNamesMap[name] = struct{}{} + requested[name] = struct{}{} } - - t.queuersMu.RLock() - + // iterate t.queuers (not queueNames) to keep queueMu acquisition order consistent + // across goroutines, avoiding lock-ordering warnings from go-deadlock for _, q := range t.queuers { - if _, ok := queueNamesMap[q.queueName]; ok { + if _, ok := requested[q.queueName]; ok { q.queue(ctx) + delete(requested, q.queueName) } } - t.queuersMu.RUnlock() + + // this function sends to a channel that acquires the queuersMu, so needs to be outside lock. + for name := range requested { + t.notifyNewQueue(ctx, name) + } } type AssignedItemWithTask struct {