Skip to content

Commit 27c373a

Browse files
Merge pull request #486 from benjchristensen/bugfix-async-subject
BugFix: AsyncSubject
2 parents 95717fc + b9d29ce commit 27c373a

File tree

2 files changed

+239
-32
lines changed

2 files changed

+239
-32
lines changed

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 89 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
import java.util.concurrent.ConcurrentHashMap;
1919
import java.util.concurrent.atomic.AtomicBoolean;
2020
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.concurrent.locks.ReentrantLock;
2122

23+
import rx.Notification;
2224
import rx.Observer;
2325
import rx.Subscription;
2426
import rx.operators.SafeObservableSubscription;
27+
import rx.subscriptions.Subscriptions;
2528

2629
/**
2730
* Subject that publishes only the last event to each {@link Observer} that has subscribed when the
@@ -60,61 +63,115 @@ public class AsyncSubject<T> extends Subject<T, T> {
6063
* @return a new AsyncSubject
6164
*/
6265
public static <T> AsyncSubject<T> create() {
63-
final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
66+
final AsyncSubjectState<T> state = new AsyncSubjectState<T>();
6467

6568
OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() {
6669
@Override
6770
public Subscription onSubscribe(Observer<? super T> observer) {
68-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
69-
70-
subscription.wrap(new Subscription() {
71-
@Override
72-
public void unsubscribe() {
73-
// on unsubscribe remove it from the map of outbound observers to notify
74-
observers.remove(subscription);
71+
/*
72+
* Subscription needs to be synchronized with terminal states to ensure
73+
* race conditions are handled. When subscribing we must make sure
74+
* onComplete/onError is correctly emitted to all observers, even if it
75+
* comes in while the onComplete/onError is being propagated.
76+
*/
77+
state.SUBSCRIPTION_LOCK.lock();
78+
try {
79+
if (state.completed.get()) {
80+
emitNotificationToObserver(state, observer);
81+
return Subscriptions.empty();
82+
} else {
83+
// the subject is not completed so we subscribe
84+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
85+
86+
subscription.wrap(new Subscription() {
87+
@Override
88+
public void unsubscribe() {
89+
// on unsubscribe remove it from the map of outbound observers to notify
90+
state.observers.remove(subscription);
91+
}
92+
});
93+
94+
// on subscribe add it to the map of outbound observers to notify
95+
state.observers.put(subscription, observer);
96+
97+
return subscription;
7598
}
76-
});
99+
} finally {
100+
state.SUBSCRIPTION_LOCK.unlock();
101+
}
77102

78-
// on subscribe add it to the map of outbound observers to notify
79-
observers.put(subscription, observer);
80-
return subscription;
81103
}
104+
82105
};
83106

84-
return new AsyncSubject<T>(onSubscribe, observers);
107+
return new AsyncSubject<T>(onSubscribe, state);
85108
}
86109

87-
private final ConcurrentHashMap<Subscription, Observer<? super T>> observers;
88-
private final AtomicReference<T> currentValue;
89-
private final AtomicBoolean hasValue = new AtomicBoolean();
110+
private static <T> void emitNotificationToObserver(final AsyncSubjectState<T> state, Observer<? super T> observer) {
111+
Notification<T> finalValue = state.currentValue.get();
112+
113+
// if null that means onNext was never invoked (no Notification set)
114+
if (finalValue != null) {
115+
if (finalValue.isOnNext()) {
116+
observer.onNext(finalValue.getValue());
117+
} else if (finalValue.isOnError()) {
118+
observer.onError(finalValue.getThrowable());
119+
}
120+
}
121+
observer.onCompleted();
122+
}
90123

91-
protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, ConcurrentHashMap<Subscription, Observer<? super T>> observers) {
124+
/**
125+
* State externally constructed and passed in so the onSubscribe function has access to it.
126+
*
127+
* @param <T>
128+
*/
129+
private static class AsyncSubjectState<T> {
130+
private final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
131+
private final AtomicReference<Notification<T>> currentValue = new AtomicReference<Notification<T>>();
132+
private final AtomicBoolean completed = new AtomicBoolean();
133+
private final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock();
134+
}
135+
136+
private final AsyncSubjectState<T> state;
137+
138+
protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, AsyncSubjectState<T> state) {
92139
super(onSubscribe);
93-
this.observers = observers;
94-
this.currentValue = new AtomicReference<T>();
140+
this.state = state;
95141
}
96142

97143
@Override
98144
public void onCompleted() {
99-
T finalValue = currentValue.get();
100-
for (Observer<? super T> observer : observers.values()) {
101-
if (hasValue.get()) {
102-
observer.onNext(finalValue);
103-
}
104-
observer.onCompleted();
105-
}
145+
terminalState();
106146
}
107147

108148
@Override
109149
public void onError(Throwable e) {
110-
for (Observer<? super T> observer : observers.values()) {
111-
observer.onError(e);
112-
}
150+
state.currentValue.set(new Notification<T>(e));
151+
terminalState();
113152
}
114153

115154
@Override
116-
public void onNext(T args) {
117-
hasValue.set(true);
118-
currentValue.set(args);
155+
public void onNext(T v) {
156+
state.currentValue.set(new Notification<T>(v));
157+
}
158+
159+
private void terminalState() {
160+
/*
161+
* We can not allow new subscribers to be added while we execute the terminal state.
162+
*/
163+
state.SUBSCRIPTION_LOCK.lock();
164+
try {
165+
if (state.completed.compareAndSet(false, true)) {
166+
for (Subscription s : state.observers.keySet()) {
167+
// emit notifications to this observer
168+
emitNotificationToObserver(state, state.observers.get(s));
169+
// remove the subscription as it is completed
170+
state.observers.remove(s);
171+
}
172+
}
173+
} finally {
174+
state.SUBSCRIPTION_LOCK.unlock();
175+
}
119176
}
120177
}

rxjava-core/src/test/java/rx/subjects/AsyncSubjectTest.java

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,13 @@
1515
*/
1616
package rx.subjects;
1717

18+
import static org.junit.Assert.*;
1819
import static org.mockito.Matchers.*;
1920
import static org.mockito.Mockito.*;
2021

22+
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicReference;
24+
2125
import org.junit.Test;
2226
import org.mockito.InOrder;
2327
import org.mockito.Mockito;
@@ -66,6 +70,62 @@ public void testCompleted() {
6670
verify(aObserver, times(1)).onCompleted();
6771
}
6872

73+
@Test
74+
public void testNull() {
75+
AsyncSubject<String> subject = AsyncSubject.create();
76+
77+
@SuppressWarnings("unchecked")
78+
Observer<String> aObserver = mock(Observer.class);
79+
subject.subscribe(aObserver);
80+
81+
subject.onNext(null);
82+
subject.onCompleted();
83+
84+
verify(aObserver, times(1)).onNext(null);
85+
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
86+
verify(aObserver, times(1)).onCompleted();
87+
}
88+
89+
@Test
90+
public void testSubscribeAfterCompleted() {
91+
AsyncSubject<String> subject = AsyncSubject.create();
92+
93+
@SuppressWarnings("unchecked")
94+
Observer<String> aObserver = mock(Observer.class);
95+
96+
subject.onNext("one");
97+
subject.onNext("two");
98+
subject.onNext("three");
99+
subject.onCompleted();
100+
101+
subject.subscribe(aObserver);
102+
103+
verify(aObserver, times(1)).onNext("three");
104+
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
105+
verify(aObserver, times(1)).onCompleted();
106+
}
107+
108+
@Test
109+
public void testSubscribeAfterError() {
110+
AsyncSubject<String> subject = AsyncSubject.create();
111+
112+
@SuppressWarnings("unchecked")
113+
Observer<String> aObserver = mock(Observer.class);
114+
115+
subject.onNext("one");
116+
subject.onNext("two");
117+
subject.onNext("three");
118+
119+
RuntimeException re = new RuntimeException("failed");
120+
subject.onError(re);
121+
122+
subject.subscribe(aObserver);
123+
124+
verify(aObserver, times(1)).onError(re);
125+
verify(aObserver, Mockito.never()).onNext(any(String.class));
126+
verify(aObserver, Mockito.never()).onCompleted();
127+
}
128+
69129
@Test
70130
public void testError() {
71131
AsyncSubject<String> subject = AsyncSubject.create();
@@ -151,4 +211,94 @@ public void testEmptySubjectCompleted() {
151211
inOrder.verify(aObserver, times(1)).onCompleted();
152212
inOrder.verifyNoMoreInteractions();
153213
}
214+
215+
/**
216+
* Can receive timeout if subscribe never receives an onError/onCompleted ... which reveals a race condition.
217+
*/
218+
@Test
219+
public void testSubscribeCompletionRaceCondition() {
220+
/*
221+
* With non-threadsafe code this fails most of the time on my dev laptop and is non-deterministic enough
222+
* to act as a unit test to the race conditions.
223+
*
224+
* With the synchronization code in place I can not get this to fail on my laptop.
225+
*/
226+
for (int i = 0; i < 50; i++) {
227+
final AsyncSubject<String> subject = AsyncSubject.create();
228+
final AtomicReference<String> value1 = new AtomicReference<String>();
229+
230+
subject.subscribe(new Action1<String>() {
231+
232+
@Override
233+
public void call(String t1) {
234+
try {
235+
// simulate a slow observer
236+
Thread.sleep(50);
237+
} catch (InterruptedException e) {
238+
e.printStackTrace();
239+
}
240+
value1.set(t1);
241+
}
242+
243+
});
244+
245+
Thread t1 = new Thread(new Runnable() {
246+
247+
@Override
248+
public void run() {
249+
subject.onNext("value");
250+
subject.onCompleted();
251+
}
252+
});
253+
254+
SubjectObserverThread t2 = new SubjectObserverThread(subject);
255+
SubjectObserverThread t3 = new SubjectObserverThread(subject);
256+
SubjectObserverThread t4 = new SubjectObserverThread(subject);
257+
SubjectObserverThread t5 = new SubjectObserverThread(subject);
258+
259+
t2.start();
260+
t3.start();
261+
t1.start();
262+
t4.start();
263+
t5.start();
264+
try {
265+
t1.join();
266+
t2.join();
267+
t3.join();
268+
t4.join();
269+
t5.join();
270+
} catch (InterruptedException e) {
271+
throw new RuntimeException(e);
272+
}
273+
274+
assertEquals("value", value1.get());
275+
assertEquals("value", t2.value.get());
276+
assertEquals("value", t3.value.get());
277+
assertEquals("value", t4.value.get());
278+
assertEquals("value", t5.value.get());
279+
}
280+
281+
}
282+
283+
private static class SubjectObserverThread extends Thread {
284+
285+
private final AsyncSubject<String> subject;
286+
private final AtomicReference<String> value = new AtomicReference<String>();
287+
288+
public SubjectObserverThread(AsyncSubject<String> subject) {
289+
this.subject = subject;
290+
}
291+
292+
@Override
293+
public void run() {
294+
try {
295+
// a timeout exception will happen if we don't get a terminal state
296+
String v = subject.timeout(2000, TimeUnit.MILLISECONDS).toBlockingObservable().single();
297+
value.set(v);
298+
} catch (Exception e) {
299+
e.printStackTrace();
300+
}
301+
}
302+
}
303+
154304
}

0 commit comments

Comments
 (0)