Skip to content

Commit 8072871

Browse files
Merge pull request #1743 from benjchristensen/issue-1685
Subject Error Handling
2 parents b2fe579 + d66df7f commit 8072871

9 files changed

+297
-6
lines changed

src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
*/
1616
package rx.subjects;
1717

18+
import java.util.ArrayList;
19+
import java.util.List;
20+
1821
import rx.Observer;
22+
import rx.exceptions.CompositeException;
23+
import rx.exceptions.Exceptions;
1924
import rx.functions.Action1;
2025
import rx.internal.operators.NotificationLite;
2126
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
@@ -104,8 +109,24 @@ public void onCompleted() {
104109
public void onError(final Throwable e) {
105110
if (state.active) {
106111
Object n = nl.error(e);
112+
List<Throwable> errors = null;
107113
for (SubjectObserver<T> bo : state.terminate(n)) {
108-
bo.onError(e);
114+
try {
115+
bo.onError(e);
116+
} catch (Throwable e2) {
117+
if (errors == null) {
118+
errors = new ArrayList<Throwable>();
119+
}
120+
errors.add(e2);
121+
}
122+
}
123+
124+
if (errors != null) {
125+
if (errors.size() == 1) {
126+
Exceptions.propagate(errors.get(0));
127+
} else {
128+
throw new CompositeException("Errors while emitting AsyncSubject.onError", errors);
129+
}
109130
}
110131
}
111132
}

src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,12 @@
1616
package rx.subjects;
1717

1818

19+
import java.util.ArrayList;
20+
import java.util.List;
21+
1922
import rx.Observer;
23+
import rx.exceptions.CompositeException;
24+
import rx.exceptions.Exceptions;
2025
import rx.functions.Action1;
2126
import rx.internal.operators.NotificationLite;
2227
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
@@ -131,8 +136,24 @@ public void onError(Throwable e) {
131136
Object last = state.get();
132137
if (last == null || state.active) {
133138
Object n = nl.error(e);
139+
List<Throwable> errors = null;
134140
for (SubjectObserver<T> bo : state.terminate(n)) {
135-
bo.emitNext(n, state.nl);
141+
try {
142+
bo.emitNext(n, state.nl);
143+
} catch (Throwable e2) {
144+
if (errors == null) {
145+
errors = new ArrayList<Throwable>();
146+
}
147+
errors.add(e2);
148+
}
149+
}
150+
151+
if (errors != null) {
152+
if (errors.size() == 1) {
153+
Exceptions.propagate(errors.get(0));
154+
} else {
155+
throw new CompositeException("Errors while emitting AsyncSubject.onError", errors);
156+
}
136157
}
137158
}
138159
}

src/main/java/rx/subjects/PublishSubject.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,12 @@
1515
*/
1616
package rx.subjects;
1717

18+
import java.util.ArrayList;
19+
import java.util.List;
20+
1821
import rx.Observer;
22+
import rx.exceptions.CompositeException;
23+
import rx.exceptions.Exceptions;
1924
import rx.functions.Action1;
2025
import rx.internal.operators.NotificationLite;
2126
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
@@ -89,8 +94,23 @@ public void onCompleted() {
8994
public void onError(final Throwable e) {
9095
if (state.active) {
9196
Object n = nl.error(e);
97+
List<Throwable> errors = null;
9298
for (SubjectObserver<T> bo : state.terminate(n)) {
93-
bo.emitNext(n, state.nl);
99+
try {
100+
bo.emitNext(n, state.nl);
101+
} catch (Throwable e2) {
102+
if (errors == null) {
103+
errors = new ArrayList<Throwable>();
104+
}
105+
errors.add(e2);
106+
}
107+
}
108+
if (errors != null) {
109+
if (errors.size() == 1) {
110+
Exceptions.propagate(errors.get(0));
111+
} else {
112+
throw new CompositeException("Errors while emitting PublishSubject.onError", errors);
113+
}
94114
}
95115
}
96116
}

src/main/java/rx/subjects/ReplaySubject.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616
package rx.subjects;
1717

1818
import java.util.ArrayList;
19+
import java.util.List;
1920
import java.util.concurrent.TimeUnit;
2021
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2122

2223
import rx.Observer;
2324
import rx.Scheduler;
25+
import rx.exceptions.CompositeException;
26+
import rx.exceptions.Exceptions;
2427
import rx.functions.Action1;
2528
import rx.functions.Func1;
2629
import rx.functions.Functions;
@@ -303,9 +306,25 @@ public void onNext(T t) {
303306
public void onError(final Throwable e) {
304307
if (ssm.active) {
305308
state.error(e);
309+
List<Throwable> errors = null;
306310
for (SubjectObserver<? super T> o : ssm.terminate(NotificationLite.instance().error(e))) {
307-
if (caughtUp(o)) {
308-
o.onError(e);
311+
try {
312+
if (caughtUp(o)) {
313+
o.onError(e);
314+
}
315+
} catch (Throwable e2) {
316+
if (errors == null) {
317+
errors = new ArrayList<Throwable>();
318+
}
319+
errors.add(e2);
320+
}
321+
}
322+
323+
if (errors != null) {
324+
if (errors.size() == 1) {
325+
Exceptions.propagate(errors.get(0));
326+
} else {
327+
throw new CompositeException("Errors while emitting ReplaySubject.onError", errors);
309328
}
310329
}
311330
}

src/test/java/rx/ObservableTests.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,15 @@
4848
import rx.Observable.OnSubscribe;
4949
import rx.Observable.Transformer;
5050
import rx.exceptions.OnErrorNotImplementedException;
51-
import rx.functions.Action0;
5251
import rx.functions.Action1;
5352
import rx.functions.Action2;
5453
import rx.functions.Func1;
5554
import rx.functions.Func2;
5655
import rx.observables.ConnectableObservable;
5756
import rx.observers.TestSubscriber;
5857
import rx.schedulers.TestScheduler;
58+
import rx.subjects.ReplaySubject;
59+
import rx.subjects.Subject;
5960
import rx.subscriptions.BooleanSubscription;
6061

6162
public class ObservableTests {
@@ -1125,5 +1126,21 @@ public String call(Integer t1) {
11251126
ts.assertNoErrors();
11261127
ts.assertReceivedOnNext(Arrays.asList("1", "2", "3"));
11271128
}
1129+
1130+
@Test
1131+
public void testErrorThrownIssue1685() {
1132+
Subject<Object, Object> subject = ReplaySubject.create();
1133+
1134+
Observable.error(new RuntimeException("oops"))
1135+
.materialize()
1136+
.delay(1, TimeUnit.SECONDS)
1137+
.dematerialize()
1138+
.subscribe(subject);
1139+
1140+
subject.subscribe();
1141+
subject.materialize().toBlocking().first();
1142+
1143+
System.out.println("Done");
1144+
}
11281145

11291146
}

src/test/java/rx/subjects/AsyncSubjectTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.subjects;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.fail;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Matchers.anyString;
2122
import static org.mockito.Mockito.inOrder;
@@ -33,7 +34,10 @@
3334

3435
import rx.Observer;
3536
import rx.Subscription;
37+
import rx.exceptions.CompositeException;
38+
import rx.exceptions.OnErrorNotImplementedException;
3639
import rx.functions.Action1;
40+
import rx.observers.TestSubscriber;
3741

3842
public class AsyncSubjectTest {
3943

@@ -281,5 +285,49 @@ public void run() {
281285
}
282286
}
283287
}
288+
289+
@Test
290+
public void testOnErrorThrowsDoesntPreventDelivery() {
291+
AsyncSubject<String> ps = AsyncSubject.create();
292+
293+
ps.subscribe();
294+
TestSubscriber<String> ts = new TestSubscriber<String>();
295+
ps.subscribe(ts);
296+
297+
try {
298+
ps.onError(new RuntimeException("an exception"));
299+
fail("expect OnErrorNotImplementedException");
300+
} catch (OnErrorNotImplementedException e) {
301+
// ignore
302+
}
303+
// even though the onError above throws we should still receive it on the other subscriber
304+
assertEquals(1, ts.getOnErrorEvents().size());
305+
}
306+
307+
/**
308+
* This one has multiple failures so should get a CompositeException
309+
*/
310+
@Test
311+
public void testOnErrorThrowsDoesntPreventDelivery2() {
312+
AsyncSubject<String> ps = AsyncSubject.create();
313+
314+
ps.subscribe();
315+
ps.subscribe();
316+
TestSubscriber<String> ts = new TestSubscriber<String>();
317+
ps.subscribe(ts);
318+
ps.subscribe();
319+
ps.subscribe();
320+
ps.subscribe();
321+
322+
try {
323+
ps.onError(new RuntimeException("an exception"));
324+
fail("expect OnErrorNotImplementedException");
325+
} catch (CompositeException e) {
326+
// we should have 5 of them
327+
assertEquals(5, e.getExceptions().size());
328+
}
329+
// even though the onError above throws we should still receive it on the other subscriber
330+
assertEquals(1, ts.getOnErrorEvents().size());
331+
}
284332

285333
}

src/test/java/rx/subjects/BehaviorSubjectTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.subjects;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.fail;
1920
import static org.mockito.Matchers.any;
2021
import static org.mockito.Mockito.inOrder;
2122
import static org.mockito.Mockito.mock;
@@ -30,7 +31,10 @@
3031
import rx.Observable;
3132
import rx.Observer;
3233
import rx.Subscription;
34+
import rx.exceptions.CompositeException;
35+
import rx.exceptions.OnErrorNotImplementedException;
3336
import rx.functions.Func1;
37+
import rx.observers.TestSubscriber;
3438

3539
public class BehaviorSubjectTest {
3640

@@ -367,4 +371,48 @@ public void testTakeOneSubscriber() {
367371

368372
assertEquals(0, source.subscriberCount());
369373
}
374+
375+
@Test
376+
public void testOnErrorThrowsDoesntPreventDelivery() {
377+
BehaviorSubject<String> ps = BehaviorSubject.create();
378+
379+
ps.subscribe();
380+
TestSubscriber<String> ts = new TestSubscriber<String>();
381+
ps.subscribe(ts);
382+
383+
try {
384+
ps.onError(new RuntimeException("an exception"));
385+
fail("expect OnErrorNotImplementedException");
386+
} catch (OnErrorNotImplementedException e) {
387+
// ignore
388+
}
389+
// even though the onError above throws we should still receive it on the other subscriber
390+
assertEquals(1, ts.getOnErrorEvents().size());
391+
}
392+
393+
/**
394+
* This one has multiple failures so should get a CompositeException
395+
*/
396+
@Test
397+
public void testOnErrorThrowsDoesntPreventDelivery2() {
398+
BehaviorSubject<String> ps = BehaviorSubject.create();
399+
400+
ps.subscribe();
401+
ps.subscribe();
402+
TestSubscriber<String> ts = new TestSubscriber<String>();
403+
ps.subscribe(ts);
404+
ps.subscribe();
405+
ps.subscribe();
406+
ps.subscribe();
407+
408+
try {
409+
ps.onError(new RuntimeException("an exception"));
410+
fail("expect OnErrorNotImplementedException");
411+
} catch (CompositeException e) {
412+
// we should have 5 of them
413+
assertEquals(5, e.getExceptions().size());
414+
}
415+
// even though the onError above throws we should still receive it on the other subscriber
416+
assertEquals(1, ts.getOnErrorEvents().size());
417+
}
370418
}

0 commit comments

Comments
 (0)