Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 0 additions & 24 deletions subject_async.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ var _ Subject[int] = (*asyncSubjectImpl[int])(nil)
// The emitted value or error is stored for future subscriptions. 0 or 1 value is emitted.
func NewAsyncSubject[T any]() Subject[T] {
return &asyncSubjectImpl[T]{
mu: sync.Mutex{},
status: 0,

observers: sync.Map{},
Expand All @@ -41,7 +40,6 @@ func NewAsyncSubject[T any]() Subject[T] {
}

type asyncSubjectImpl[T any] struct {
mu sync.Mutex // sync.RWMutex would be better, but it is too slow for high-volume subjects
status Kind

observers sync.Map
Expand All @@ -61,9 +59,6 @@ func (s *asyncSubjectImpl[T]) Subscribe(destination Observer[T]) Subscription {
func (s *asyncSubjectImpl[T]) SubscribeWithContext(subscriberCtx context.Context, destination Observer[T]) Subscription {
subscription := NewSubscriber(destination)

s.mu.Lock()
defer s.mu.Unlock()

switch s.status {
case KindNext:
// fallthrough
Expand Down Expand Up @@ -104,16 +99,12 @@ func (s *asyncSubjectImpl[T]) Next(value T) {

// Implements Observer.
func (s *asyncSubjectImpl[T]) NextWithContext(ctx context.Context, value T) {
s.mu.Lock()

if s.status == KindNext {
s.hasValue = true
s.value = lo.T2(ctx, value) // A previous value might be erased. It won't be forwarded to `OnDroppedNotification`.
} else {
OnDroppedNotification(ctx, NewNotificationNext(value))
}

s.mu.Unlock()
}

// Implements Observer.
Expand All @@ -123,8 +114,6 @@ func (s *asyncSubjectImpl[T]) Error(err error) {

// Implements Observer.
func (s *asyncSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) {
s.mu.Lock()

if s.status == KindNext {
s.err = lo.T2(ctx, err)
s.status = KindError
Expand All @@ -133,7 +122,6 @@ func (s *asyncSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) {
OnDroppedNotification(ctx, NewNotificationError[T](err))
}

s.mu.Unlock()
s.unsubscribeAll()
}

Expand All @@ -144,8 +132,6 @@ func (s *asyncSubjectImpl[T]) Complete() {

// Implements Observer.
func (s *asyncSubjectImpl[T]) CompleteWithContext(ctx context.Context) {
s.mu.Lock()

if s.status == KindNext {
s.status = KindComplete
if s.hasValue {
Expand All @@ -157,7 +143,6 @@ func (s *asyncSubjectImpl[T]) CompleteWithContext(ctx context.Context) {
OnDroppedNotification(ctx, NewNotificationComplete[T]())
}

s.mu.Unlock()
s.unsubscribeAll()
}

Expand Down Expand Up @@ -185,25 +170,16 @@ func (s *asyncSubjectImpl[T]) CountObservers() int {

// Implements Observer.
func (s *asyncSubjectImpl[T]) IsClosed() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.status != KindNext
}

// Implements Observer.
func (s *asyncSubjectImpl[T]) HasThrown() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.status == KindError
}

// Implements Observer.
func (s *asyncSubjectImpl[T]) IsCompleted() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.status == KindComplete
}

Expand Down
24 changes: 0 additions & 24 deletions subject_behavior.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ var _ Subject[int] = (*behaviorSubjectImpl[int])(nil)
// After completion, new subscription won't receive the last value, but the error will eventually propagated.
func NewBehaviorSubject[T any](initial T) Subject[T] {
return &behaviorSubjectImpl[T]{
mu: sync.Mutex{},
status: KindNext,

observers: sync.Map{},
Expand All @@ -40,7 +39,6 @@ func NewBehaviorSubject[T any](initial T) Subject[T] {
}

type behaviorSubjectImpl[T any] struct {
mu sync.Mutex // sync.RWMutex would be better, but it is too slow for high-volume subjects
status Kind

observers sync.Map
Expand All @@ -59,9 +57,6 @@ func (s *behaviorSubjectImpl[T]) Subscribe(destination Observer[T]) Subscription
func (s *behaviorSubjectImpl[T]) SubscribeWithContext(subscriberCtx context.Context, destination Observer[T]) Subscription {
subscription := NewSubscriber(destination)

s.mu.Lock()
defer s.mu.Unlock()

switch s.status {
case KindNext:
// fallthrough
Expand Down Expand Up @@ -100,16 +95,12 @@ func (s *behaviorSubjectImpl[T]) Next(value T) {

// Implements Observer.
func (s *behaviorSubjectImpl[T]) NextWithContext(ctx context.Context, value T) {
s.mu.Lock()

if s.status == KindNext {
s.last = lo.T2(ctx, value)
s.broadcastNext(ctx, value)
} else {
OnDroppedNotification(ctx, NewNotificationNext(value))
}

s.mu.Unlock()
}

// Implements Observer.
Expand All @@ -119,8 +110,6 @@ func (s *behaviorSubjectImpl[T]) Error(err error) {

// Implements Observer.
func (s *behaviorSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) {
s.mu.Lock()

if s.status == KindNext {
s.err = lo.T2(ctx, err)
s.status = KindError
Expand All @@ -129,7 +118,6 @@ func (s *behaviorSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error
OnDroppedNotification(ctx, NewNotificationError[T](err))
}

s.mu.Unlock()
s.unsubscribeAll()
}

Expand All @@ -140,16 +128,13 @@ func (s *behaviorSubjectImpl[T]) Complete() {

// Implements Observer.
func (s *behaviorSubjectImpl[T]) CompleteWithContext(ctx context.Context) {
s.mu.Lock()

if s.status == KindNext {
s.status = KindComplete
s.broadcastComplete(ctx)
} else {
OnDroppedNotification(ctx, NewNotificationComplete[T]())
}

s.mu.Unlock()
s.unsubscribeAll()
}

Expand Down Expand Up @@ -177,25 +162,16 @@ func (s *behaviorSubjectImpl[T]) CountObservers() int {

// Implements Observer.
func (s *behaviorSubjectImpl[T]) IsClosed() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.status != KindNext
}

// Implements Observer.
func (s *behaviorSubjectImpl[T]) HasThrown() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.status == KindError
}

// Implements Observer.
func (s *behaviorSubjectImpl[T]) IsCompleted() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.status == KindComplete
}

Expand Down
24 changes: 0 additions & 24 deletions subject_publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ var _ Subject[int] = (*publishSubjectImpl[int])(nil)
// Values received before subscription are not transmitted.
func NewPublishSubject[T any]() Subject[T] {
return &publishSubjectImpl[T]{
mu: sync.Mutex{},
status: KindNext,

observers: sync.Map{},
Expand All @@ -39,7 +38,6 @@ func NewPublishSubject[T any]() Subject[T] {
}

type publishSubjectImpl[T any] struct {
mu sync.Mutex // sync.RWMutex would be better, but it is too slow for high-volume subjects
status Kind

observers sync.Map
Expand All @@ -57,9 +55,6 @@ func (s *publishSubjectImpl[T]) Subscribe(destination Observer[T]) Subscription
func (s *publishSubjectImpl[T]) SubscribeWithContext(subscriberCtx context.Context, destination Observer[T]) Subscription {
subscription := NewSubscriber(destination)

s.mu.Lock()
defer s.mu.Unlock()

switch s.status {
case KindNext:
// fallthrough
Expand Down Expand Up @@ -95,15 +90,11 @@ func (s *publishSubjectImpl[T]) Next(value T) {

// Implements Observer.
func (s *publishSubjectImpl[T]) NextWithContext(ctx context.Context, value T) {
s.mu.Lock()

if s.status == KindNext {
s.broadcastNext(ctx, value)
} else {
OnDroppedNotification(ctx, NewNotificationNext(value))
}

s.mu.Unlock()
}

// Implements Observer.
Expand All @@ -113,8 +104,6 @@ func (s *publishSubjectImpl[T]) Error(err error) {

// Implements Observer.
func (s *publishSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error) {
s.mu.Lock()

if s.status == KindNext {
s.err = lo.T2(ctx, err)
s.status = KindError
Expand All @@ -123,7 +112,6 @@ func (s *publishSubjectImpl[T]) ErrorWithContext(ctx context.Context, err error)
OnDroppedNotification(ctx, NewNotificationError[T](err))
}

s.mu.Unlock()
s.unsubscribeAll()
}

Expand All @@ -134,16 +122,13 @@ func (s *publishSubjectImpl[T]) Complete() {

// Implements Observer.
func (s *publishSubjectImpl[T]) CompleteWithContext(ctx context.Context) {
s.mu.Lock()

if s.status == KindNext {
s.status = KindComplete
s.broadcastComplete(ctx)
} else {
OnDroppedNotification(ctx, NewNotificationComplete[T]())
}

s.mu.Unlock()
s.unsubscribeAll()
}

Expand Down Expand Up @@ -171,25 +156,16 @@ func (s *publishSubjectImpl[T]) CountObservers() int {

// Implements Observer.
func (s *publishSubjectImpl[T]) IsClosed() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.status != KindNext
}

// Implements Observer.
func (s *publishSubjectImpl[T]) HasThrown() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.status == KindError
}

// Implements Observer.
func (s *publishSubjectImpl[T]) IsCompleted() bool {
s.mu.Lock()
defer s.mu.Unlock()

return s.status == KindComplete
}

Expand Down
Loading