Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit e8dd4ed

Browse files
committedJul 17, 2015
Revert "cache now supports backpressure"
This reverts commit 18ff5af.
1 parent 5411086 commit e8dd4ed

File tree

7 files changed

+242
-871
lines changed

7 files changed

+242
-871
lines changed
 

‎src/main/java/rx/Observable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3594,7 +3594,7 @@ public final <B> Observable<List<T>> buffer(Observable<B> boundary, int initialC
35943594
* @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
35953595
*/
35963596
public final Observable<T> cache() {
3597-
return CachedObservable.from(this);
3597+
return create(new OnSubscribeCache<T>(this));
35983598
}
35993599

36003600
/**
@@ -3629,7 +3629,7 @@ public final Observable<T> cache() {
36293629
* @see <a href="http://reactivex.io/documentation/operators/replay.html">ReactiveX operators documentation: Replay</a>
36303630
*/
36313631
public final Observable<T> cache(int capacityHint) {
3632-
return CachedObservable.from(this, capacityHint);
3632+
return create(new OnSubscribeCache<T>(this, capacityHint));
36333633
}
36343634

36353635
/**
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19+
20+
import rx.Observable;
21+
import rx.Observable.OnSubscribe;
22+
import rx.Subscriber;
23+
import rx.subjects.ReplaySubject;
24+
import rx.subjects.Subject;
25+
26+
/**
27+
* This method has similar behavior to {@link Observable#replay()} except that this auto-subscribes
28+
* to the source Observable rather than returning a connectable Observable.
29+
* <p>
30+
* <img width="640" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/cache.png" alt="">
31+
* <p>
32+
* This is useful with an Observable that you want to cache responses when you can't control the
33+
* subscribe/unsubscribe behavior of all the Observers.
34+
* <p>
35+
* <em>Note:</em> You sacrifice the ability to unsubscribe from the origin when you use this operator, so be
36+
* careful not to use this operator on Observables that emit infinite or very large numbers of
37+
* items, as this will use up memory.
38+
*
39+
* @param <T>
40+
* the cached value type
41+
*/
42+
public final class OnSubscribeCache<T> implements OnSubscribe<T> {
43+
protected final Observable<? extends T> source;
44+
protected final Subject<? super T, ? extends T> cache;
45+
volatile int sourceSubscribed;
46+
@SuppressWarnings("rawtypes")
47+
static final AtomicIntegerFieldUpdater<OnSubscribeCache> SRC_SUBSCRIBED_UPDATER
48+
= AtomicIntegerFieldUpdater.newUpdater(OnSubscribeCache.class, "sourceSubscribed");
49+
50+
public OnSubscribeCache(Observable<? extends T> source) {
51+
this(source, ReplaySubject.<T> create());
52+
}
53+
54+
public OnSubscribeCache(Observable<? extends T> source, int capacity) {
55+
this(source, ReplaySubject.<T> create(capacity));
56+
}
57+
58+
/* accessible to tests */OnSubscribeCache(Observable<? extends T> source, Subject<? super T, ? extends T> cache) {
59+
this.source = source;
60+
this.cache = cache;
61+
}
62+
63+
@Override
64+
public void call(Subscriber<? super T> s) {
65+
if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
66+
source.subscribe(cache);
67+
/*
68+
* Note that we will never unsubscribe from 'source' unless we receive `onCompleted` or `onError`,
69+
* as we want to receive and cache all of its values.
70+
*
71+
* This means this should never be used on an infinite or very large sequence, similar to toList().
72+
*/
73+
}
74+
cache.unsafeSubscribe(s);
75+
}
76+
}

‎src/main/java/rx/internal/util/CachedObservable.java

Lines changed: 0 additions & 432 deletions
This file was deleted.

‎src/main/java/rx/internal/util/LinkedArrayList.java

Lines changed: 0 additions & 136 deletions
This file was deleted.
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.fail;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.verify;
23+
24+
import java.util.Arrays;
25+
import java.util.concurrent.CountDownLatch;
26+
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicInteger;
28+
29+
import org.junit.Test;
30+
31+
import rx.Observable;
32+
import rx.Subscriber;
33+
import rx.functions.Action0;
34+
import rx.functions.Action1;
35+
import rx.functions.Func1;
36+
import rx.functions.Func2;
37+
import rx.observers.TestSubscriber;
38+
import rx.schedulers.Schedulers;
39+
import rx.subjects.AsyncSubject;
40+
import rx.subjects.BehaviorSubject;
41+
import rx.subjects.PublishSubject;
42+
import rx.subjects.ReplaySubject;
43+
import rx.subjects.Subject;
44+
45+
public class OnSubscribeCacheTest {
46+
47+
@Test
48+
public void testCache() throws InterruptedException {
49+
final AtomicInteger counter = new AtomicInteger();
50+
Observable<String> o = Observable.create(new Observable.OnSubscribe<String>() {
51+
52+
@Override
53+
public void call(final Subscriber<? super String> observer) {
54+
new Thread(new Runnable() {
55+
56+
@Override
57+
public void run() {
58+
counter.incrementAndGet();
59+
System.out.println("published observable being executed");
60+
observer.onNext("one");
61+
observer.onCompleted();
62+
}
63+
}).start();
64+
}
65+
}).cache();
66+
67+
// we then expect the following 2 subscriptions to get that same value
68+
final CountDownLatch latch = new CountDownLatch(2);
69+
70+
// subscribe once
71+
o.subscribe(new Action1<String>() {
72+
73+
@Override
74+
public void call(String v) {
75+
assertEquals("one", v);
76+
System.out.println("v: " + v);
77+
latch.countDown();
78+
}
79+
});
80+
81+
// subscribe again
82+
o.subscribe(new Action1<String>() {
83+
84+
@Override
85+
public void call(String v) {
86+
assertEquals("one", v);
87+
System.out.println("v: " + v);
88+
latch.countDown();
89+
}
90+
});
91+
92+
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
93+
fail("subscriptions did not receive values");
94+
}
95+
assertEquals(1, counter.get());
96+
}
97+
98+
private void testWithCustomSubjectAndRepeat(Subject<Integer, Integer> subject, Integer... expected) {
99+
Observable<Integer> source0 = Observable.just(1, 2, 3)
100+
.subscribeOn(Schedulers.io())
101+
.flatMap(new Func1<Integer, Observable<Integer>>() {
102+
@Override
103+
public Observable<Integer> call(final Integer i) {
104+
return Observable.timer(i * 20, TimeUnit.MILLISECONDS).map(new Func1<Long, Integer>() {
105+
@Override
106+
public Integer call(Long t1) {
107+
return i;
108+
}
109+
});
110+
}
111+
});
112+
113+
Observable<Integer> source1 = Observable.create(new OnSubscribeCache<Integer>(source0, subject));
114+
115+
Observable<Integer> source2 = source1
116+
.repeat(4)
117+
.zipWith(Observable.interval(0, 10, TimeUnit.MILLISECONDS, Schedulers.newThread()), new Func2<Integer, Long, Integer>() {
118+
@Override
119+
public Integer call(Integer t1, Long t2) {
120+
return t1;
121+
}
122+
123+
});
124+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
125+
source2.subscribe(ts);
126+
127+
ts.awaitTerminalEvent();
128+
ts.assertNoErrors();
129+
System.out.println(ts.getOnNextEvents());
130+
ts.assertReceivedOnNext(Arrays.asList(expected));
131+
}
132+
133+
@Test(timeout = 10000)
134+
public void testWithAsyncSubjectAndRepeat() {
135+
testWithCustomSubjectAndRepeat(AsyncSubject.<Integer> create(), 3, 3, 3, 3);
136+
}
137+
138+
@Test(timeout = 10000)
139+
public void testWithBehaviorSubjectAndRepeat() {
140+
// BehaviorSubject just completes when repeated
141+
testWithCustomSubjectAndRepeat(BehaviorSubject.create(0), 0, 1, 2, 3);
142+
}
143+
144+
@Test(timeout = 10000)
145+
public void testWithPublishSubjectAndRepeat() {
146+
// PublishSubject just completes when repeated
147+
testWithCustomSubjectAndRepeat(PublishSubject.<Integer> create(), 1, 2, 3);
148+
}
149+
150+
@Test
151+
public void testWithReplaySubjectAndRepeat() {
152+
testWithCustomSubjectAndRepeat(ReplaySubject.<Integer> create(), 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3);
153+
}
154+
155+
@Test
156+
public void testUnsubscribeSource() {
157+
Action0 unsubscribe = mock(Action0.class);
158+
Observable<Integer> o = Observable.just(1).doOnUnsubscribe(unsubscribe).cache();
159+
o.subscribe();
160+
o.subscribe();
161+
o.subscribe();
162+
verify(unsubscribe, times(1)).call();
163+
}
164+
}

‎src/test/java/rx/internal/util/CachedObservableTest.java

Lines changed: 0 additions & 264 deletions
This file was deleted.

‎src/test/java/rx/internal/util/LinkedArrayListTest.java

Lines changed: 0 additions & 37 deletions
This file was deleted.

0 commit comments

Comments
 (0)
Please sign in to comment.