Skip to content

Commit 15e6444

Browse files
Merge pull request #2788 from zsxwing/publish-no-subscriber
Fix the bug that 'publish' will cache items when no subscriber
2 parents 15d030b + 0fd9c76 commit 15e6444

File tree

2 files changed

+39
-6
lines changed

2 files changed

+39
-6
lines changed

src/main/java/rx/internal/operators/OperatorPublish.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,10 @@ public synchronized boolean canEmitWithDecrement() {
213213
return false;
214214
}
215215

216+
public synchronized boolean hasNoSubscriber() {
217+
return subscribers.length == 0;
218+
}
219+
216220
public synchronized void incrementOutstandingAfterFailedEmit() {
217221
outstandingRequests++;
218222
}
@@ -308,11 +312,13 @@ public void emit(Object t) throws MissingBackpressureException {
308312
}
309313

310314
private void requestMoreAfterEmission(int emitted) {
311-
OriginSubscriber<T> origin = state.getOrigin();
312-
if (emitted > 0 && origin != null) {
313-
long r = origin.originOutstanding.addAndGet(-emitted);
314-
if (r <= origin.THRESHOLD) {
315-
origin.requestMore(RxRingBuffer.SIZE - origin.THRESHOLD);
315+
if (emitted > 0) {
316+
OriginSubscriber<T> origin = state.getOrigin();
317+
if (origin != null) {
318+
long r = origin.originOutstanding.addAndGet(-emitted);
319+
if (r <= origin.THRESHOLD) {
320+
origin.requestMore(RxRingBuffer.SIZE - origin.THRESHOLD);
321+
}
316322
}
317323
}
318324
}
@@ -336,8 +342,18 @@ public void drainQueue(OriginSubscriber<T> originSubscriber) {
336342
* If we want to batch this then we need to account for new subscribers arriving with a lower request count
337343
* concurrently while iterating the batch ... or accept that they won't
338344
*/
339-
340345
while (true) {
346+
if (localState.hasNoSubscriber()) {
347+
// Drop items due to no subscriber
348+
if (localBuffer.poll() == null) {
349+
// Exit due to no more item
350+
break;
351+
} else {
352+
// Keep dropping cached items.
353+
continue;
354+
}
355+
}
356+
341357
boolean shouldEmit = localState.canEmitWithDecrement();
342358
if (!shouldEmit) {
343359
break;

src/test/java/rx/internal/operators/OperatorPublishTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import rx.observables.ConnectableObservable;
3131
import rx.observers.TestSubscriber;
3232
import rx.schedulers.Schedulers;
33+
import rx.schedulers.TestScheduler;
3334

3435
public class OperatorPublishTest {
3536

@@ -242,4 +243,20 @@ public void call() {
242243

243244
assertEquals(8, sourceEmission.get());
244245
}
246+
247+
@Test
248+
public void testConnectWithNoSubscriber() {
249+
TestScheduler scheduler = new TestScheduler();
250+
ConnectableObservable<Long> co = Observable.timer(10, 10, TimeUnit.MILLISECONDS, scheduler).take(3).publish();
251+
co.connect();
252+
// Emit 0
253+
scheduler.advanceTimeBy(15, TimeUnit.MILLISECONDS);
254+
TestSubscriber subscriber = new TestSubscriber<Long>();
255+
co.subscribe(subscriber);
256+
// Emit 1 and 2
257+
scheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS);
258+
subscriber.assertReceivedOnNext(Arrays.asList(1L, 2L));
259+
subscriber.assertNoErrors();
260+
subscriber.assertTerminalEvent();
261+
}
245262
}

0 commit comments

Comments
 (0)