Skip to content

Commit 864dc22

Browse files
Merge pull request #1745 from benjchristensen/serialized-subject
SerializedSubject
2 parents 1807575 + 4db0975 commit 864dc22

File tree

2 files changed

+87
-0
lines changed

2 files changed

+87
-0
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subjects;
17+
18+
import rx.Subscriber;
19+
import rx.observers.SerializedObserver;
20+
21+
public class SerializedSubject<T, R> extends Subject<T, R> {
22+
private final SerializedObserver<T> observer;
23+
24+
public SerializedSubject(final Subject<T, R> actual) {
25+
super(new OnSubscribe<R>() {
26+
27+
@Override
28+
public void call(Subscriber<? super R> child) {
29+
actual.unsafeSubscribe(child);
30+
}
31+
32+
});
33+
this.observer = new SerializedObserver<T>(actual);
34+
}
35+
36+
@Override
37+
public void onCompleted() {
38+
observer.onCompleted();
39+
}
40+
41+
@Override
42+
public void onError(Throwable e) {
43+
observer.onError(e);
44+
}
45+
46+
@Override
47+
public void onNext(T t) {
48+
observer.onNext(t);
49+
}
50+
51+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subjects;
17+
18+
import java.util.Arrays;
19+
20+
import org.junit.Test;
21+
22+
import rx.observers.TestSubscriber;
23+
24+
public class SerializedSubjectTest {
25+
26+
@Test
27+
public void testBasic() {
28+
SerializedSubject<String, String> subject = new SerializedSubject<String, String>(PublishSubject.<String> create());
29+
TestSubscriber<String> ts = new TestSubscriber<String>();
30+
subject.subscribe(ts);
31+
subject.onNext("hello");
32+
subject.onCompleted();
33+
ts.awaitTerminalEvent();
34+
ts.assertReceivedOnNext(Arrays.asList("hello"));
35+
}
36+
}

0 commit comments

Comments
 (0)