diff --git a/README.md b/README.md index ecc7e70f..4a7ee040 100644 --- a/README.md +++ b/README.md @@ -110,8 +110,8 @@ public interface Publisher { | [:bulb:](#1.7 "1.7 explained") | *The intent of this rule is to make sure that onError and onComplete are the final states of an interaction between a Publisher and Subscriber pair.* | | 8 | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. | | [:bulb:](#1.8 "1.8 explained") | *The intent of this rule is to make sure that Publishers respect a Subscriber’s request to cancel a Subscription when Subscription.cancel() has been called. The reason for **eventually** is because signals can have propagation delay due to being asynchronous.* | -| 9 | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST [return normally](#term_return_normally), except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). | -| [:bulb:](#1.9 "1.9 explained") | *The intent of this rule is to make sure that `onSubscribe` is always signalled before any of the other signals, so that initialization logic can be executed by the Subscriber when the signal is received. Also `onSubscribe` MUST only be called at most once, [see [2.12](#2.12)]. If the supplied `Subscriber` is `null`, there is nowhere else to signal this but to the caller, which means a `java.lang.NullPointerException` must be thrown. Examples of possible situations: A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, or in a [terminal state](#term_terminal_state).* | +| 9 | `Publisher.subscribe` MUST call `onSubscribe` [synchronously](#term_sync) on the provided `Subscriber`, and do so prior to any other signals to that `Subscriber`, and MUST [return normally](#term_return_normally), except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). | +| [:bulb:](#1.9 "1.9 explained") | *The intent of this rule is to make sure that `onSubscribe` is always signalled before any of the other signals and is called synchronusly with in the `subscribe` method, so that initialization logic can be executed by the Subscriber when the signal is received. Also `onSubscribe` MUST only be called at most once, [see [2.12](#2.12)]. If the supplied `Subscriber` is `null`, there is nowhere else to signal this but to the caller, which means a `java.lang.NullPointerException` must be thrown. Examples of possible situations: A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, or in a [terminal state](#term_terminal_state).* | | 10 | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. | | [:bulb:](#1.10 "1.10 explained") | *The intent of this rule is to have callers of `subscribe` be aware that a generic Publisher and a generic Subscriber cannot be assumed to support being attached multiple times. Furthermore, it also mandates that the semantics of `subscribe` must be upheld no matter how many times it is called.* | | 11 | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. | diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java index 2dee33da..d9750345 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncIterablePublisher.java @@ -18,8 +18,8 @@ import java.util.Iterator; import java.util.Collections; import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; /** * AsyncIterablePublisher is an implementation of Reactive Streams `Publisher` @@ -59,7 +59,6 @@ public void subscribe(final Subscriber s) { // These represent the protocol of the `AsyncIterablePublishers` SubscriptionImpls static interface Signal {}; enum Cancel implements Signal { Instance; }; - enum Subscribe implements Signal { Instance; }; enum Send implements Signal { Instance; }; static final class Request implements Signal { final long n; @@ -87,7 +86,7 @@ final class SubscriptionImpl implements Subscription, Runnable { // We are using this `AtomicBoolean` to make sure that this `Subscription` doesn't run concurrently with itself, // which would violate rule 1.3 among others (no concurrent notifications). - private final AtomicBoolean on = new AtomicBoolean(false); + private final AtomicInteger wip = new AtomicInteger(0); // This method will register inbound demand from our `Subscriber` and validate it against rule 3.9 and rule 3.17 private void doRequest(final long n) { @@ -205,25 +204,27 @@ private void signal(final Signal signal) { // This is the main "event loop" if you so will @Override public final void run() { - if(on.get()) { // establishes a happens-before relationship with the end of the previous run - try { - final Signal s = inboundSignals.poll(); // We take a signal off the queue - if (!cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7 + int remainingWork = 1; + for (;;) { - // Below we simply unpack the `Signal`s and invoke the corresponding methods - if (s instanceof Request) - doRequest(((Request)s).n); - else if (s == Send.Instance) - doSend(); - else if (s == Cancel.Instance) - doCancel(); - else if (s == Subscribe.Instance) - doSubscribe(); + Signal s; + while ((s = inboundSignals.poll()) != null) { + if (cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7 + return; } - } finally { - on.set(false); // establishes a happens-before relationship with the beginning of the next run - if(!inboundSignals.isEmpty()) // If we still have signals to process - tryScheduleToExecute(); // Then we try to schedule ourselves to execute again + + // Below we simply unpack the `Signal`s and invoke the corresponding methods + if (s instanceof Request) + doRequest(((Request) s).n); + else if (s == Send.Instance) + doSend(); + else if (s == Cancel.Instance) + doCancel(); + } + + remainingWork = wip.addAndGet(-remainingWork); // establishes a happens-before relationship with the beginning of the next run + if (remainingWork == 0) { + return; } } } @@ -231,20 +232,19 @@ else if (s == Subscribe.Instance) // This method makes sure that this `Subscription` is only running on one Thread at a time, // this is important to make sure that we follow rule 1.3 private final void tryScheduleToExecute() { - if(on.compareAndSet(false, true)) { - try { - executor.execute(this); - } catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully - if (!cancelled) { - doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6 - try { - terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t)); - } finally { - inboundSignals.clear(); // We're not going to need these anymore - // This subscription is cancelled by now, but letting it become schedulable again means - // that we can drain the inboundSignals queue if anything arrives after clearing - on.set(false); - } + if (wip.getAndIncrement() != 0) { // ensure happens-before with already running work + return; + } + + try { + executor.execute(this); + } catch(Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully + if (!cancelled) { + doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6 + try { + terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t)); + } finally { + inboundSignals.clear(); // We're not going to need these anymore } } } @@ -263,7 +263,7 @@ private final void tryScheduleToExecute() { // method is only intended to be invoked once, and immediately after the constructor has // finished. void init() { - signal(Subscribe.Instance); + doSubscribe(); } }; } \ No newline at end of file diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index a14a21eb..d11b83f7 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -306,9 +306,11 @@ public void onSubscribe(Subscription s) { concurrentAccessBarrier.enterSignal(signal); subs = s; - subs.request(1); concurrentAccessBarrier.leaveSignal(signal); + + //request after leave signal since request may be offloaded so it is going to be false positive racing + subs.request(1); } @Override @@ -1099,7 +1101,7 @@ public void onNext(T element) { } } }; - env.subscribe(pub, sub, env.defaultTimeoutMillis()); + env.subscribe(pub, sub); // eventually triggers `onNext`, which will then trigger up to `callsCounter` times `request(Long.MAX_VALUE - 1)` // we're pretty sure to overflow from those diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index f3e435cf..35de99dd 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -297,28 +297,20 @@ public T flopAndFail(String msg) { public void subscribe(Publisher pub, TestSubscriber sub) throws InterruptedException { - subscribe(pub, sub, defaultTimeoutMillis); - } - - public void subscribe(Publisher pub, TestSubscriber sub, long timeoutMillis) throws InterruptedException { pub.subscribe(sub); - sub.subscription.expectCompletion(timeoutMillis, String.format("Could not subscribe %s to Publisher %s", sub, pub)); + sub.subscription.expectCompletion(0, String.format("Could not subscribe %s to Publisher %s", sub, pub)); verifyNoAsyncErrorsNoDelay(); } public ManualSubscriber newBlackholeSubscriber(Publisher pub) throws InterruptedException { ManualSubscriberWithSubscriptionSupport sub = new BlackholeSubscriberWithSubscriptionSupport(this); - subscribe(pub, sub, defaultTimeoutMillis()); + subscribe(pub, sub); return sub; } public ManualSubscriber newManualSubscriber(Publisher pub) throws InterruptedException { - return newManualSubscriber(pub, defaultTimeoutMillis()); - } - - public ManualSubscriber newManualSubscriber(Publisher pub, long timeoutMillis) throws InterruptedException { ManualSubscriberWithSubscriptionSupport sub = new ManualSubscriberWithSubscriptionSupport(this); - subscribe(pub, sub, timeoutMillis); + subscribe(pub, sub); return sub; }