Skip to content

Commit e218bf5

Browse files
Merge pull request #483 from benjchristensen/onEach-fix
DoOn Tweaks
2 parents ac650b5 + 470bb89 commit e218bf5

File tree

2 files changed

+115
-10
lines changed

2 files changed

+115
-10
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5023,33 +5023,58 @@ public void onNext(T args) {
50235023

50245024
return create(OperationDoOnEach.doOnEach(this, observer));
50255025
}
5026+
5027+
/**
5028+
* Invokes an action if onError is emitted from the observable sequence.
5029+
*
5030+
* @param onError
5031+
* The action to invoke if onError is invoked.
5032+
*
5033+
* @return
5034+
* The source sequence with the side-effecting behavior applied.
5035+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229804(v=vs.103).aspx">MSDN: Observable.Do</a>
5036+
*/
5037+
public Observable<T> doOnError(final Action1<Throwable> onError) {
5038+
Observer<T> observer = new Observer<T>() {
5039+
@Override
5040+
public void onCompleted() {}
5041+
5042+
@Override
5043+
public void onError(Throwable e) {
5044+
onError.call(e);
5045+
}
50265046

5047+
@Override
5048+
public void onNext(T args) { }
5049+
5050+
};
5051+
5052+
5053+
return create(OperationDoOnEach.doOnEach(this, observer));
5054+
}
5055+
50275056
/**
5028-
* Invokes an action for each element in the observable sequence.
5057+
* Invokes an action when onCompleted is emitted from the observable sequence.
50295058
*
5030-
* @param onNext
5031-
* The action to invoke for each element in the source sequence.
50325059
* @param onCompleted
5033-
* The action to invoke when the source sequence is completed.
5060+
* The action to invoke when onCompleted is emitted.
50345061
*
50355062
* @return
50365063
* The source sequence with the side-effecting behavior applied.
5037-
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229659(v=vs.103).aspx">MSDN: Observable.Do</a>
5064+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229804(v=vs.103).aspx">MSDN: Observable.Do</a>
50385065
*/
5039-
public Observable<T> doOnEach(final Action1<T> onNext, final Action0 onCompleted) {
5066+
public Observable<T> doOnCompleted(final Action0 onCompleted) {
50405067
Observer<T> observer = new Observer<T>() {
50415068
@Override
50425069
public void onCompleted() {
50435070
onCompleted.call();
50445071
}
50455072

50465073
@Override
5047-
public void onError(Throwable e) {}
5074+
public void onError(Throwable e) { }
50485075

50495076
@Override
5050-
public void onNext(T args) {
5051-
onNext.call(args);
5052-
}
5077+
public void onNext(T args) { }
50535078

50545079
};
50555080

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
import java.util.concurrent.atomic.AtomicReference;
22+
23+
import org.junit.Test;
24+
25+
import rx.util.functions.Action0;
26+
import rx.util.functions.Action1;
27+
28+
public class ObservableDoOnTest {
29+
30+
@Test
31+
public void testDoOnEach() {
32+
final AtomicReference<String> r = new AtomicReference<String>();
33+
String output = Observable.from("one").doOnEach(new Action1<String>() {
34+
35+
@Override
36+
public void call(String v) {
37+
r.set(v);
38+
}
39+
}).toBlockingObservable().single();
40+
41+
assertEquals("one", output);
42+
assertEquals("one", r.get());
43+
}
44+
45+
@Test
46+
public void testDoOnError() {
47+
final AtomicReference<Throwable> r = new AtomicReference<Throwable>();
48+
Throwable t = null;
49+
try {
50+
Observable.<String> error(new RuntimeException("an error")).doOnError(new Action1<Throwable>() {
51+
52+
@Override
53+
public void call(Throwable v) {
54+
r.set(v);
55+
}
56+
}).toBlockingObservable().single();
57+
fail("expected exception, not a return value");
58+
} catch (Throwable e) {
59+
t = e;
60+
}
61+
62+
assertNotNull(t);
63+
assertEquals(t, r.get());
64+
}
65+
66+
@Test
67+
public void testDoOnCompleted() {
68+
final AtomicBoolean r = new AtomicBoolean();
69+
String output = Observable.from("one").doOnCompleted(new Action0() {
70+
71+
@Override
72+
public void call() {
73+
r.set(true);
74+
}
75+
}).toBlockingObservable().single();
76+
77+
assertEquals("one", output);
78+
assertTrue(r.get());
79+
}
80+
}

0 commit comments

Comments
 (0)