Skip to content

Commit d66df7f

Browse files
Subject Error Handling
Fixes #1685 by delaying errors that are caught until after all subscribers have a chance to receive the event. Note that this has a lot of code duplication to handle this across the Subject implementations. It may be worth abstracting this logic ... but right now I'm just doing what makes sense to fix this as the Subject abstractions are non-trivial.
1 parent b2fe579 commit d66df7f

9 files changed

+297
-6
lines changed

src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
*/
1616
package rx.subjects;
1717

18+
import java.util.ArrayList;
19+
import java.util.List;
20+
1821
import rx.Observer;
22+
import rx.exceptions.CompositeException;
23+
import rx.exceptions.Exceptions;
1924
import rx.functions.Action1;
2025
import rx.internal.operators.NotificationLite;
2126
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
@@ -104,8 +109,24 @@ public void onCompleted() {
104109
public void onError(final Throwable e) {
105110
if (state.active) {
106111
Object n = nl.error(e);
112+
List<Throwable> errors = null;
107113
for (SubjectObserver<T> bo : state.terminate(n)) {
108-
bo.onError(e);
114+
try {
115+
bo.onError(e);
116+
} catch (Throwable e2) {
117+
if (errors == null) {
118+
errors = new ArrayList<Throwable>();
119+
}
120+
errors.add(e2);
121+
}
122+
}
123+
124+
if (errors != null) {
125+
if (errors.size() == 1) {
126+
Exceptions.propagate(errors.get(0));
127+
} else {
128+
throw new CompositeException("Errors while emitting AsyncSubject.onError", errors);
129+
}
109130
}
110131
}
111132
}

src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616
package rx.subjects;
1717

1818

19+
import java.util.ArrayList;
20+
import java.util.List;
21+
1922
import rx.Observer;
23+
import rx.exceptions.CompositeException;
24+
import rx.exceptions.Exceptions;
2025
import rx.functions.Action1;
2126
import rx.internal.operators.NotificationLite;
2227
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
@@ -131,8 +136,24 @@ public void onError(Throwable e) {
131136
Object last = state.get();
132137
if (last == null || state.active) {
133138
Object n = nl.error(e);
139+
List<Throwable> errors = null;
134140
for (SubjectObserver<T> bo : state.terminate(n)) {
135-
bo.emitNext(n, state.nl);
141+
try {
142+
bo.emitNext(n, state.nl);
143+
} catch (Throwable e2) {
144+
if (errors == null) {
145+
errors = new ArrayList<Throwable>();
146+
}
147+
errors.add(e2);
148+
}
149+
}
150+
151+
if (errors != null) {
152+
if (errors.size() == 1) {
153+
Exceptions.propagate(errors.get(0));
154+
} else {
155+
throw new CompositeException("Errors while emitting AsyncSubject.onError", errors);
156+
}
136157
}
137158
}
138159
}

src/main/java/rx/subjects/PublishSubject.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
*/
1616
package rx.subjects;
1717

18+
import java.util.ArrayList;
19+
import java.util.List;
20+
1821
import rx.Observer;
22+
import rx.exceptions.CompositeException;
23+
import rx.exceptions.Exceptions;
1924
import rx.functions.Action1;
2025
import rx.internal.operators.NotificationLite;
2126
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
@@ -89,8 +94,23 @@ public void onCompleted() {
8994
public void onError(final Throwable e) {
9095
if (state.active) {
9196
Object n = nl.error(e);
97+
List<Throwable> errors = null;
9298
for (SubjectObserver<T> bo : state.terminate(n)) {
93-
bo.emitNext(n, state.nl);
99+
try {
100+
bo.emitNext(n, state.nl);
101+
} catch (Throwable e2) {
102+
if (errors == null) {
103+
errors = new ArrayList<Throwable>();
104+
}
105+
errors.add(e2);
106+
}
107+
}
108+
if (errors != null) {
109+
if (errors.size() == 1) {
110+
Exceptions.propagate(errors.get(0));
111+
} else {
112+
throw new CompositeException("Errors while emitting PublishSubject.onError", errors);
113+
}
94114
}
95115
}
96116
}

src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
package rx.subjects;
1717

1818
import java.util.ArrayList;
19+
import java.util.List;
1920
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2122

2223
import rx.Observer;
2324
import rx.Scheduler;
25+
import rx.exceptions.CompositeException;
26+
import rx.exceptions.Exceptions;
2427
import rx.functions.Action1;
2528
import rx.functions.Func1;
2629
import rx.functions.Functions;
@@ -303,9 +306,25 @@ public void onNext(T t) {
303306
public void onError(final Throwable e) {
304307
if (ssm.active) {
305308
state.error(e);
309+
List<Throwable> errors = null;
306310
for (SubjectObserver<? super T> o : ssm.terminate(NotificationLite.instance().error(e))) {
307-
if (caughtUp(o)) {
308-
o.onError(e);
311+
try {
312+
if (caughtUp(o)) {
313+
o.onError(e);
314+
}
315+
} catch (Throwable e2) {
316+
if (errors == null) {
317+
errors = new ArrayList<Throwable>();
318+
}
319+
errors.add(e2);
320+
}
321+
}
322+
323+
if (errors != null) {
324+
if (errors.size() == 1) {
325+
Exceptions.propagate(errors.get(0));
326+
} else {
327+
throw new CompositeException("Errors while emitting ReplaySubject.onError", errors);
309328
}
310329
}
311330
}

src/test/java/rx/ObservableTests.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,15 @@
4848
import rx.Observable.OnSubscribe;
4949
import rx.Observable.Transformer;
5050
import rx.exceptions.OnErrorNotImplementedException;
51-
import rx.functions.Action0;
5251
import rx.functions.Action1;
5352
import rx.functions.Action2;
5453
import rx.functions.Func1;
5554
import rx.functions.Func2;
5655
import rx.observables.ConnectableObservable;
5756
import rx.observers.TestSubscriber;
5857
import rx.schedulers.TestScheduler;
58+
import rx.subjects.ReplaySubject;
59+
import rx.subjects.Subject;
5960
import rx.subscriptions.BooleanSubscription;
6061

6162
public class ObservableTests {
@@ -1125,5 +1126,21 @@ public String call(Integer t1) {
11251126
ts.assertNoErrors();
11261127
ts.assertReceivedOnNext(Arrays.asList("1", "2", "3"));
11271128
}
1129+
1130+
@Test
1131+
public void testErrorThrownIssue1685() {
1132+
Subject<Object, Object> subject = ReplaySubject.create();
1133+
1134+
Observable.error(new RuntimeException("oops"))
1135+
.materialize()
1136+
.delay(1, TimeUnit.SECONDS)
1137+
.dematerialize()
1138+
.subscribe(subject);
1139+
1140+
subject.subscribe();
1141+
subject.materialize().toBlocking().first();
1142+
1143+
System.out.println("Done");
1144+
}
11281145

11291146
}

src/test/java/rx/subjects/AsyncSubjectTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.subjects;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.fail;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Matchers.anyString;
2122
import static org.mockito.Mockito.inOrder;
@@ -33,7 +34,10 @@
3334

3435
import rx.Observer;
3536
import rx.Subscription;
37+
import rx.exceptions.CompositeException;
38+
import rx.exceptions.OnErrorNotImplementedException;
3639
import rx.functions.Action1;
40+
import rx.observers.TestSubscriber;
3741

3842
public class AsyncSubjectTest {
3943

@@ -281,5 +285,49 @@ public void run() {
281285
}
282286
}
283287
}
288+
289+
@Test
290+
public void testOnErrorThrowsDoesntPreventDelivery() {
291+
AsyncSubject<String> ps = AsyncSubject.create();
292+
293+
ps.subscribe();
294+
TestSubscriber<String> ts = new TestSubscriber<String>();
295+
ps.subscribe(ts);
296+
297+
try {
298+
ps.onError(new RuntimeException("an exception"));
299+
fail("expect OnErrorNotImplementedException");
300+
} catch (OnErrorNotImplementedException e) {
301+
// ignore
302+
}
303+
// even though the onError above throws we should still receive it on the other subscriber
304+
assertEquals(1, ts.getOnErrorEvents().size());
305+
}
306+
307+
/**
308+
* This one has multiple failures so should get a CompositeException
309+
*/
310+
@Test
311+
public void testOnErrorThrowsDoesntPreventDelivery2() {
312+
AsyncSubject<String> ps = AsyncSubject.create();
313+
314+
ps.subscribe();
315+
ps.subscribe();
316+
TestSubscriber<String> ts = new TestSubscriber<String>();
317+
ps.subscribe(ts);
318+
ps.subscribe();
319+
ps.subscribe();
320+
ps.subscribe();
321+
322+
try {
323+
ps.onError(new RuntimeException("an exception"));
324+
fail("expect OnErrorNotImplementedException");
325+
} catch (CompositeException e) {
326+
// we should have 5 of them
327+
assertEquals(5, e.getExceptions().size());
328+
}
329+
// even though the onError above throws we should still receive it on the other subscriber
330+
assertEquals(1, ts.getOnErrorEvents().size());
331+
}
284332

285333
}

src/test/java/rx/subjects/BehaviorSubjectTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.subjects;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.fail;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Mockito.inOrder;
2122
import static org.mockito.Mockito.mock;
@@ -30,7 +31,10 @@
3031
import rx.Observable;
3132
import rx.Observer;
3233
import rx.Subscription;
34+
import rx.exceptions.CompositeException;
35+
import rx.exceptions.OnErrorNotImplementedException;
3336
import rx.functions.Func1;
37+
import rx.observers.TestSubscriber;
3438

3539
public class BehaviorSubjectTest {
3640

@@ -367,4 +371,48 @@ public void testTakeOneSubscriber() {
367371

368372
assertEquals(0, source.subscriberCount());
369373
}
374+
375+
@Test
376+
public void testOnErrorThrowsDoesntPreventDelivery() {
377+
BehaviorSubject<String> ps = BehaviorSubject.create();
378+
379+
ps.subscribe();
380+
TestSubscriber<String> ts = new TestSubscriber<String>();
381+
ps.subscribe(ts);
382+
383+
try {
384+
ps.onError(new RuntimeException("an exception"));
385+
fail("expect OnErrorNotImplementedException");
386+
} catch (OnErrorNotImplementedException e) {
387+
// ignore
388+
}
389+
// even though the onError above throws we should still receive it on the other subscriber
390+
assertEquals(1, ts.getOnErrorEvents().size());
391+
}
392+
393+
/**
394+
* This one has multiple failures so should get a CompositeException
395+
*/
396+
@Test
397+
public void testOnErrorThrowsDoesntPreventDelivery2() {
398+
BehaviorSubject<String> ps = BehaviorSubject.create();
399+
400+
ps.subscribe();
401+
ps.subscribe();
402+
TestSubscriber<String> ts = new TestSubscriber<String>();
403+
ps.subscribe(ts);
404+
ps.subscribe();
405+
ps.subscribe();
406+
ps.subscribe();
407+
408+
try {
409+
ps.onError(new RuntimeException("an exception"));
410+
fail("expect OnErrorNotImplementedException");
411+
} catch (CompositeException e) {
412+
// we should have 5 of them
413+
assertEquals(5, e.getExceptions().size());
414+
}
415+
// even though the onError above throws we should still receive it on the other subscriber
416+
assertEquals(1, ts.getOnErrorEvents().size());
417+
}
370418
}

0 commit comments

Comments
 (0)