From 051b0fb779e6e11bf5419323cbe448cf085750a2 Mon Sep 17 00:00:00 2001 From: srikary12 Date: Thu, 30 Oct 2025 19:22:54 +0530 Subject: [PATCH] refactor: remove unnecessary mutex locks in subject implementations --- subject_async.go | 24 ------------------------ subject_behavior.go | 24 ------------------------ subject_publish.go | 24 ------------------------ subject_replay.go | 24 ------------------------ subject_unicast.go | 35 ----------------------------------- 5 files changed, 131 deletions(-) diff --git a/subject_async.go b/subject_async.go index b094ca9..f92b823 100644 --- a/subject_async.go +++ b/subject_async.go @@ -28,7 +28,6 @@ var _ Subject[int] = (*asyncSubjectImpl[int])(nil) // The emitted value or error is stored for future subscriptions. 0 or 1 value is emitted. func NewAsyncSubject[T any]() Subject[T] { return &asyncSubjectImpl[T]{ - mu: sync.Mutex{}, status: 0, observers: sync.Map{}, @@ -41,7 +40,6 @@ func NewAsyncSubject[T any]() Subject[T] { } type asyncSubjectImpl[T any] struct { - mu sync.Mutex // sync.RWMutex would be better, but it is too slow for high-volume subjects status Kind observers sync.Map @@ -61,9 +59,6 @@ func (s *asyncSubjectImpl[T]) Subscribe(destination Observer[T]) Subscription { func (s *asyncSubjectImpl[T]) SubscribeWithContext(subscriberCtx context.Context, destination Observer[T]) Subscription { subscription := NewSubscriber(destination) - s.mu.Lock() - defer s.mu.Unlock() - switch s.status { case KindNext: // fallthrough @@ -104,16 +99,12 @@ func (s *asyncSubjectImpl[T]) Next(value T) { // Implements Observer. func (s *asyncSubjectImpl[T]) NextWithContext(ctx context.Context, value T) { - s.mu.Lock() - if s.status == KindNext { s.hasValue = true s.value = lo.T2(ctx, value) // A previous value might be erased. It won't be forwarded to `OnDroppedNotification`. } else { OnDroppedNotification(ctx, NewNotificationNext(value)) } - - s.mu.Unlock() } // Implements Observer. @@ -123,8 +114,6 @@ func (s *asyncSubjectImpl[T]) Error(err error) { // Implements Observer. func (s *asyncSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) { - s.mu.Lock() - if s.status == KindNext { s.err = lo.T2(ctx, err) s.status = KindError @@ -133,7 +122,6 @@ func (s *asyncSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) { OnDroppedNotification(ctx, NewNotificationError[T](err)) } - s.mu.Unlock() s.unsubscribeAll() } @@ -144,8 +132,6 @@ func (s *asyncSubjectImpl[T]) Complete() { // Implements Observer. func (s *asyncSubjectImpl[T]) CompleteWithContext(ctx context.Context) { - s.mu.Lock() - if s.status == KindNext { s.status = KindComplete if s.hasValue { @@ -157,7 +143,6 @@ func (s *asyncSubjectImpl[T]) CompleteWithContext(ctx context.Context) { OnDroppedNotification(ctx, NewNotificationComplete[T]()) } - s.mu.Unlock() s.unsubscribeAll() } @@ -185,25 +170,16 @@ func (s *asyncSubjectImpl[T]) CountObservers() int { // Implements Observer. func (s *asyncSubjectImpl[T]) IsClosed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status != KindNext } // Implements Observer. func (s *asyncSubjectImpl[T]) HasThrown() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status == KindError } // Implements Observer. func (s *asyncSubjectImpl[T]) IsCompleted() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status == KindComplete } diff --git a/subject_behavior.go b/subject_behavior.go index ff3ea11..2ea8c9a 100644 --- a/subject_behavior.go +++ b/subject_behavior.go @@ -28,7 +28,6 @@ var _ Subject[int] = (*behaviorSubjectImpl[int])(nil) // After completion, new subscription won't receive the last value, but the error will eventually propagated. func NewBehaviorSubject[T any](initial T) Subject[T] { return &behaviorSubjectImpl[T]{ - mu: sync.Mutex{}, status: KindNext, observers: sync.Map{}, @@ -40,7 +39,6 @@ func NewBehaviorSubject[T any](initial T) Subject[T] { } type behaviorSubjectImpl[T any] struct { - mu sync.Mutex // sync.RWMutex would be better, but it is too slow for high-volume subjects status Kind observers sync.Map @@ -59,9 +57,6 @@ func (s *behaviorSubjectImpl[T]) Subscribe(destination Observer[T]) Subscription func (s *behaviorSubjectImpl[T]) SubscribeWithContext(subscriberCtx context.Context, destination Observer[T]) Subscription { subscription := NewSubscriber(destination) - s.mu.Lock() - defer s.mu.Unlock() - switch s.status { case KindNext: // fallthrough @@ -100,16 +95,12 @@ func (s *behaviorSubjectImpl[T]) Next(value T) { // Implements Observer. func (s *behaviorSubjectImpl[T]) NextWithContext(ctx context.Context, value T) { - s.mu.Lock() - if s.status == KindNext { s.last = lo.T2(ctx, value) s.broadcastNext(ctx, value) } else { OnDroppedNotification(ctx, NewNotificationNext(value)) } - - s.mu.Unlock() } // Implements Observer. @@ -119,8 +110,6 @@ func (s *behaviorSubjectImpl[T]) Error(err error) { // Implements Observer. func (s *behaviorSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) { - s.mu.Lock() - if s.status == KindNext { s.err = lo.T2(ctx, err) s.status = KindError @@ -129,7 +118,6 @@ func (s *behaviorSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error OnDroppedNotification(ctx, NewNotificationError[T](err)) } - s.mu.Unlock() s.unsubscribeAll() } @@ -140,8 +128,6 @@ func (s *behaviorSubjectImpl[T]) Complete() { // Implements Observer. func (s *behaviorSubjectImpl[T]) CompleteWithContext(ctx context.Context) { - s.mu.Lock() - if s.status == KindNext { s.status = KindComplete s.broadcastComplete(ctx) @@ -149,7 +135,6 @@ func (s *behaviorSubjectImpl[T]) CompleteWithContext(ctx context.Context) { OnDroppedNotification(ctx, NewNotificationComplete[T]()) } - s.mu.Unlock() s.unsubscribeAll() } @@ -177,25 +162,16 @@ func (s *behaviorSubjectImpl[T]) CountObservers() int { // Implements Observer. func (s *behaviorSubjectImpl[T]) IsClosed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status != KindNext } // Implements Observer. func (s *behaviorSubjectImpl[T]) HasThrown() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status == KindError } // Implements Observer. func (s *behaviorSubjectImpl[T]) IsCompleted() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status == KindComplete } diff --git a/subject_publish.go b/subject_publish.go index a3d2a48..2ebfc3d 100644 --- a/subject_publish.go +++ b/subject_publish.go @@ -28,7 +28,6 @@ var _ Subject[int] = (*publishSubjectImpl[int])(nil) // Values received before subscription are not transmitted. func NewPublishSubject[T any]() Subject[T] { return &publishSubjectImpl[T]{ - mu: sync.Mutex{}, status: KindNext, observers: sync.Map{}, @@ -39,7 +38,6 @@ func NewPublishSubject[T any]() Subject[T] { } type publishSubjectImpl[T any] struct { - mu sync.Mutex // sync.RWMutex would be better, but it is too slow for high-volume subjects status Kind observers sync.Map @@ -57,9 +55,6 @@ func (s *publishSubjectImpl[T]) Subscribe(destination Observer[T]) Subscription func (s *publishSubjectImpl[T]) SubscribeWithContext(subscriberCtx context.Context, destination Observer[T]) Subscription { subscription := NewSubscriber(destination) - s.mu.Lock() - defer s.mu.Unlock() - switch s.status { case KindNext: // fallthrough @@ -95,15 +90,11 @@ func (s *publishSubjectImpl[T]) Next(value T) { // Implements Observer. func (s *publishSubjectImpl[T]) NextWithContext(ctx context.Context, value T) { - s.mu.Lock() - if s.status == KindNext { s.broadcastNext(ctx, value) } else { OnDroppedNotification(ctx, NewNotificationNext(value)) } - - s.mu.Unlock() } // Implements Observer. @@ -113,8 +104,6 @@ func (s *publishSubjectImpl[T]) Error(err error) { // Implements Observer. func (s *publishSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) { - s.mu.Lock() - if s.status == KindNext { s.err = lo.T2(ctx, err) s.status = KindError @@ -123,7 +112,6 @@ func (s *publishSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) OnDroppedNotification(ctx, NewNotificationError[T](err)) } - s.mu.Unlock() s.unsubscribeAll() } @@ -134,8 +122,6 @@ func (s *publishSubjectImpl[T]) Complete() { // Implements Observer. func (s *publishSubjectImpl[T]) CompleteWithContext(ctx context.Context) { - s.mu.Lock() - if s.status == KindNext { s.status = KindComplete s.broadcastComplete(ctx) @@ -143,7 +129,6 @@ func (s *publishSubjectImpl[T]) CompleteWithContext(ctx context.Context) { OnDroppedNotification(ctx, NewNotificationComplete[T]()) } - s.mu.Unlock() s.unsubscribeAll() } @@ -171,25 +156,16 @@ func (s *publishSubjectImpl[T]) CountObservers() int { // Implements Observer. func (s *publishSubjectImpl[T]) IsClosed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status != KindNext } // Implements Observer. func (s *publishSubjectImpl[T]) HasThrown() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status == KindError } // Implements Observer. func (s *publishSubjectImpl[T]) IsCompleted() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status == KindComplete } diff --git a/subject_replay.go b/subject_replay.go index 70722b4..5604933 100644 --- a/subject_replay.go +++ b/subject_replay.go @@ -31,7 +31,6 @@ var _ Subject[int] = (*replaySubjectImpl[int])(nil) // After error or completion, new subscriptions receive values from the buffer then the error or the completion. func NewReplaySubject[T any](bufferSize int) Subject[T] { return &replaySubjectImpl[T]{ - mu: sync.Mutex{}, status: KindNext, observers: sync.Map{}, @@ -44,7 +43,6 @@ func NewReplaySubject[T any](bufferSize int) Subject[T] { } type replaySubjectImpl[T any] struct { - mu sync.Mutex // sync.RWMutex would be better, but it is too slow for high-volume subjects status Kind observers sync.Map @@ -64,9 +62,6 @@ func (s *replaySubjectImpl[T]) Subscribe(destination Observer[T]) Subscription { func (s *replaySubjectImpl[T]) SubscribeWithContext(subscriberCtx context.Context, destination Observer[T]) Subscription { subscription := NewSubscriber(destination) - s.mu.Lock() - defer s.mu.Unlock() - for _, v := range s.values { subscription.NextWithContext(v.A, v.B) } @@ -106,8 +101,6 @@ func (s *replaySubjectImpl[T]) Next(value T) { // Implements Observer. func (s *replaySubjectImpl[T]) NextWithContext(ctx context.Context, value T) { - s.mu.Lock() - if s.status == KindNext { s.broadcastNext(ctx, value) @@ -119,8 +112,6 @@ func (s *replaySubjectImpl[T]) NextWithContext(ctx context.Context, value T) { } else { OnDroppedNotification(ctx, NewNotificationNext(value)) } - - s.mu.Unlock() } // Implements Observer. @@ -130,8 +121,6 @@ func (s *replaySubjectImpl[T]) Error(err error) { // Implements Observer. func (s *replaySubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) { - s.mu.Lock() - if s.status == KindNext { s.err = lo.T2(ctx, err) s.status = KindError @@ -140,7 +129,6 @@ func (s *replaySubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) OnDroppedNotification(ctx, NewNotificationError[T](err)) } - s.mu.Unlock() s.unsubscribeAll() } @@ -151,8 +139,6 @@ func (s *replaySubjectImpl[T]) Complete() { // Implements Observer. func (s *replaySubjectImpl[T]) CompleteWithContext(ctx context.Context) { - s.mu.Lock() - if s.status == KindNext { s.status = KindComplete s.broadcastComplete(ctx) @@ -160,7 +146,6 @@ func (s *replaySubjectImpl[T]) CompleteWithContext(ctx context.Context) { OnDroppedNotification(ctx, NewNotificationComplete[T]()) } - s.mu.Unlock() s.unsubscribeAll() } @@ -188,25 +173,16 @@ func (s *replaySubjectImpl[T]) CountObservers() int { // Implements Observer. func (s *replaySubjectImpl[T]) IsClosed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status != KindNext } // Implements Observer. func (s *replaySubjectImpl[T]) HasThrown() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status == KindError } // Implements Observer. func (s *replaySubjectImpl[T]) IsCompleted() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status == KindComplete } diff --git a/subject_unicast.go b/subject_unicast.go index fb36ee5..9dc29cb 100644 --- a/subject_unicast.go +++ b/subject_unicast.go @@ -16,7 +16,6 @@ package ro import ( "context" - "sync" "github.com/samber/lo" ) @@ -31,7 +30,6 @@ var _ Subject[int] = (*unicastSubjectImpl[int])(nil) // to relaying events live to this single Observer. func NewUnicastSubject[T any](bufferSize int) Subject[T] { return &unicastSubjectImpl[T]{ - mu: sync.Mutex{}, status: KindNext, observer: nil, @@ -43,7 +41,6 @@ func NewUnicastSubject[T any](bufferSize int) Subject[T] { } type unicastSubjectImpl[T any] struct { - mu sync.Mutex // sync.RWMutex would be better, but it is too slow for high-volume subjects status Kind observer Observer[T] @@ -62,9 +59,6 @@ func (s *unicastSubjectImpl[T]) Subscribe(destination Observer[T]) Subscription func (s *unicastSubjectImpl[T]) SubscribeWithContext(subscriberCtx context.Context, destination Observer[T]) Subscription { subscription := NewSubscriber(destination) - s.mu.Lock() - defer s.mu.Unlock() - switch s.status { case KindNext: // fallthrough @@ -90,9 +84,7 @@ func (s *unicastSubjectImpl[T]) SubscribeWithContext(subscriberCtx context.Conte s.observer = subscription subscription.Add(func() { - s.mu.Lock() s.observer = nil - s.mu.Unlock() }) return subscription @@ -105,8 +97,6 @@ func (s *unicastSubjectImpl[T]) Next(value T) { // Implements Observer. func (s *unicastSubjectImpl[T]) NextWithContext(ctx context.Context, value T) { - s.mu.Lock() - if s.status == KindNext { //nolint:nestif if s.observer != nil { tmp := s.observer @@ -121,8 +111,6 @@ func (s *unicastSubjectImpl[T]) NextWithContext(ctx context.Context, value T) { } else { OnDroppedNotification(ctx, NewNotificationNext(value)) } - - s.mu.Unlock() } // Implements Observer. @@ -132,8 +120,6 @@ func (s *unicastSubjectImpl[T]) Error(err error) { // Implements Observer. func (s *unicastSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) { - s.mu.Lock() - if s.status == KindNext { s.err = lo.T2(ctx, err) s.status = KindError @@ -149,8 +135,6 @@ func (s *unicastSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) } else { OnDroppedNotification(ctx, NewNotificationError[T](err)) } - - s.mu.Unlock() } // Implements Observer. @@ -160,8 +144,6 @@ func (s *unicastSubjectImpl[T]) Complete() { // Implements Observer. func (s *unicastSubjectImpl[T]) CompleteWithContext(ctx context.Context) { - s.mu.Lock() - if s.status == KindNext { s.status = KindComplete @@ -176,21 +158,13 @@ func (s *unicastSubjectImpl[T]) CompleteWithContext(ctx context.Context) { } else { OnDroppedNotification(ctx, NewNotificationComplete[T]()) } - - s.mu.Unlock() } func (s *unicastSubjectImpl[T]) HasObserver() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.observer != nil } func (s *unicastSubjectImpl[T]) CountObservers() int { - s.mu.Lock() - defer s.mu.Unlock() - if s.observer != nil { return 1 } @@ -200,25 +174,16 @@ func (s *unicastSubjectImpl[T]) CountObservers() int { // Implements Observer. func (s *unicastSubjectImpl[T]) IsClosed() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status != KindNext } // Implements Observer. func (s *unicastSubjectImpl[T]) HasThrown() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status == KindError } // Implements Observer. func (s *unicastSubjectImpl[T]) IsCompleted() bool { - s.mu.Lock() - defer s.mu.Unlock() - return s.status == KindComplete }