From e1cda5f6271bece144aa7d65ccb973d7fe46af8c Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 3 Apr 2017 15:50:59 +0200 Subject: [PATCH 1/4] Enable replay-like and delay-error like Processor impls --- .../tck/IdentityProcessorVerification.java | 60 +++- .../reactivestreams/tck/TestEnvironment.java | 9 + .../reactivestreams/tck/ReplayProcessor.java | 336 ++++++++++++++++++ ...layProcessorDelayErrorAsProcessorTest.java | 48 +++ ...eplayProcessorFailFastAsProcessorTest.java | 44 +++ 5 files changed, 492 insertions(+), 5 deletions(-) create mode 100644 tck/src/test/java/org/reactivestreams/tck/ReplayProcessor.java create mode 100644 tck/src/test/java/org/reactivestreams/tck/ReplayProcessorDelayErrorAsProcessorTest.java create mode 100644 tck/src/test/java/org/reactivestreams/tck/ReplayProcessorFailFastAsProcessorTest.java diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 025858c9..105fb367 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -11,6 +11,8 @@ package org.reactivestreams.tck; +import java.util.*; + import org.reactivestreams.Processor; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -25,9 +27,6 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.HashSet; -import java.util.Set; - public abstract class IdentityProcessorVerification extends WithHelperPublisher implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules { @@ -189,6 +188,19 @@ public long maxSupportedSubscribers() { return Long.MAX_VALUE; } + /** + * Indicates that the tested implementation keeps a strict event ordering in respect to + * {@code onNext} and {@code onError}. + * Some {@code Processor} implementation may emit all {@code onNext}s received before emitting + * any {@code onError}, similar to how {@code onComplete} is usually emitted after all + * previous {@code onNext} items have been emitted. The default implementation returns false, + * indicating that {@code onError} may cut ahead and get emitted even if there are + * {@code onNext} events ready for consumption (via {@code request()}) by the {@code Subscriber}. + */ + public boolean strictEventOrdering() { + return false; + } + ////////////////////// TEST ENV CLEANUP ///////////////////////////////////// @BeforeMethod @@ -417,6 +429,14 @@ public TestSetup apply(Long aLong) throws Throwable { final Exception ex = new RuntimeException("Test exception"); sendError(ex); sub1.expectError(ex); + + // some Processors may only emit the terminal error if + // all previously submitted onNext value has been + // consumed by the Subscriber + if (strictEventOrdering()) { + sub2.request(1); + expectNextElement(sub2, x); + } sub2.expectError(ex); env.verifyNoAsyncErrorsNoDelay(); @@ -668,7 +688,10 @@ public TestSetup apply(Long subscribers) throws Throwable { sub2.expectNone(); // since sub2 hasn't requested anything yet sub2.request(1); - expectNextElement(sub2, z); + // some processors could emit items from the beginning, some may + // cache for a limited time or count, therefore + // any of the first 3 items may appear after requesing one + expectAnyNextElement(sub2, Arrays.asList(x, y, z)); if (totalRequests == 3) { expectRequest(); @@ -677,7 +700,20 @@ public TestSetup apply(Long subscribers) throws Throwable { // to avoid error messages during test harness shutdown sendCompletion(); sub1.expectCompletion(env.defaultTimeoutMillis()); - sub2.expectCompletion(env.defaultTimeoutMillis()); + + // sub2 may not complete because it still has pending y or z + if (!sub2.tryExpectCompletion(env.defaultTimeoutMillis())) { + sub2.request(1); + expectAnyNextElement(sub2, Arrays.asList(y, z)); + + // z may still be pending + if (!sub2.tryExpectCompletion(env.defaultTimeoutMillis())) { + sub2.request(1); + expectNextElement(sub2, z); + // after z, it should get onComplete reasonably quickly + sub2.expectCompletion(env.defaultTimeoutMillis()); + } + } env.verifyNoAsyncErrorsNoDelay(); }}; @@ -738,6 +774,20 @@ public void expectNextElement(ManualSubscriber sub, T expected) throws Interr } } + public void expectAnyNextElement(ManualSubscriber sub, Collection expected) throws InterruptedException { + final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected)); + if (!expected.contains(elem)) { + StringBuilder sb = new StringBuilder(); + for (T t : expected) { + if (sb.length() > 0) { + sb.append(", "); + } + sb.append(String.format("`onNext(%s)`", t)); + } + env.flop(String.format("Received `onNext(%s)` on downstream but expected any of %s", elem, sb)); + } + } + public T sendNextTFromUpstream() throws InterruptedException { final T x = nextT(); sendNext(x); diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index 6b8456f6..7f1afbdc 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -506,6 +506,10 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru received.expectCompletion(timeoutMillis, errorMsg); } + public boolean tryExpectCompletion(long timeoutMillis) throws InterruptedException { + return received.tryExpectCompletion(timeoutMillis); + } + public void expectErrorWithMessage(Class expected, String requiredMessagePart) throws Exception { expectErrorWithMessage(expected, requiredMessagePart, env.defaultTimeoutMillis()); } @@ -973,6 +977,11 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru } // else, ok } + public boolean tryExpectCompletion(long timeoutMillis) throws InterruptedException { + Optional value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); + return value != null && !value.isDefined(); + } + @SuppressWarnings("unchecked") public E expectError(Class clazz, long timeoutMillis, String errorMsg) throws Exception { Thread.sleep(timeoutMillis); diff --git a/tck/src/test/java/org/reactivestreams/tck/ReplayProcessor.java b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessor.java new file mode 100644 index 00000000..ee9a57ec --- /dev/null +++ b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessor.java @@ -0,0 +1,336 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck; + +import java.util.*; +import java.util.concurrent.atomic.*; + +import org.reactivestreams.*; + +final class ReplayProcessor implements Processor { + + final AtomicReference upstream; + + final List list; + + final boolean delayError; + + final AtomicReference[]> subscribers; + + final AtomicReference error; + + volatile int size; + + volatile boolean done; + + @SuppressWarnings("rawtypes") + static final ReplaySubscription[] EMPTY = new ReplaySubscription[0]; + + @SuppressWarnings("rawtypes") + static final ReplaySubscription[] TERMINATED = new ReplaySubscription[0]; + + static final Subscription CANCELLED = new EmptySubscription(); + + ReplayProcessor(boolean delayError) { + this.delayError = delayError; + this.list = new ArrayList(); + this.upstream = new AtomicReference(); + this.subscribers = new AtomicReference[]>(EMPTY); + this.error = new AtomicReference(); + } + + @Override + public void onSubscribe(Subscription s) { + if (upstream.compareAndSet(null, s)) { + s.request(Long.MAX_VALUE); + } else { + s.cancel(); + } + } + + @Override + public void onNext(T t) { + if (t == null) { + throw new NullPointerException(); + } + + list.add(t); + size++; + for (ReplaySubscription rs : subscribers.get()) { + replay(rs); + } + } + + @SuppressWarnings("unchecked") + @Override + public void onError(Throwable t) { + if (t == null) { + throw new NullPointerException(); + } + if (error.compareAndSet(null, t)) { + done = true; + for (ReplaySubscription rs : subscribers.getAndSet(TERMINATED)) { + replay(rs); + } + } + } + + @SuppressWarnings("unchecked") + @Override + public void onComplete() { + done = true; + for (ReplaySubscription rs : subscribers.getAndSet(TERMINATED)) { + replay(rs); + } + } + + @Override + public void subscribe(Subscriber s) { + ReplaySubscription rs = new ReplaySubscription(s, this); + s.onSubscribe(rs); + if (add(rs)) { + if (rs.isCancelled()) { + remove(rs); + return; + } + } + replay(rs); + } + + public void replay(ReplaySubscription rs) { + if (rs.getAndIncrement() != 0) { + return; + } + + int missed = 1; + Subscriber a = rs.actual; + List list = this.list; + int idx = rs.index; + AtomicLong req = rs.requested; + + for (;;) { + + long r = req.get(); + + while (idx != r) { + if (req.get() == Long.MIN_VALUE) { + return; + } + + if (rs.badRequest) { + a.onError(new IllegalArgumentException("§3.9 violated: positive request amount required")); + return; + } + + boolean d = done; + + if (d && !delayError) { + Throwable ex = error.get(); + if (ex != null) { + a.onError(ex); + return; + } + } + + boolean empty = idx >= size; + + if (d && empty) { + Throwable ex = error.get(); + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + return; + } + + if (empty) { + break; + } + + a.onNext(list.get(idx++)); + } + + if (idx == r) { + if (req.get() == Long.MIN_VALUE) { + return; + } + + if (rs.badRequest) { + a.onError(new IllegalArgumentException("§3.9 violated: positive request amount required")); + return; + } + + boolean d = done; + + if (d && !delayError) { + Throwable ex = error.get(); + if (ex != null) { + a.onError(ex); + return; + } + } + + boolean empty = idx >= size; + if (d && empty) { + Throwable ex = error.get(); + if (ex != null) { + a.onError(ex); + } else { + a.onComplete(); + } + return; + } + } + + int w = rs.get(); + if (w == missed) { + rs.index = idx; + missed = rs.addAndGet(-missed); + if (missed == 0) { + break; + } + } else { + missed = w; + } + } + } + + boolean add(ReplaySubscription rs) { + for (;;) { + ReplaySubscription[] a = subscribers.get(); + if (a == TERMINATED) { + return false; + } + int n = a.length; + @SuppressWarnings("unchecked") + ReplaySubscription[] b = new ReplaySubscription[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = rs; + + if (subscribers.compareAndSet(a, b)) { + return true; + } + } + } + + void remove(ReplaySubscription rs) { + for (;;) { + ReplaySubscription[] a = subscribers.get(); + int n = a.length; + if (n == 0) { + break; + } + + int j = -1; + + for (int i = 0; i < n; i++) { + if (a[i] == rs) { + j = i; + break; + } + } + + if (j < 0) { + break; + } + if (n == 1) { + @SuppressWarnings("unchecked") + ReplaySubscription[] b = TERMINATED; + if (subscribers.compareAndSet(a, b)) { + Subscription s = upstream.getAndSet(CANCELLED); + if (s != null) { + s.cancel(); + } + break; + } + } else { + @SuppressWarnings("unchecked") + ReplaySubscription[] b = new ReplaySubscription[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + if (subscribers.compareAndSet(a, b)) { + break; + } + } + } + } + + static final class ReplaySubscription + extends AtomicInteger + implements Subscription { + + /** */ + private static final long serialVersionUID = -3704758100845141134L; + + final ReplayProcessor parent; + + final Subscriber actual; + + final AtomicLong requested; + + long emitted; + + int index; + + volatile boolean badRequest; + + ReplaySubscription(Subscriber actual, ReplayProcessor parent) { + this.actual = actual; + this.requested = new AtomicLong(); + this.parent = parent; + } + + @Override + public void request(long n) { + if (n <= 0L) { + badRequest = true; + parent.replay(this); + } else { + for (;;) { + long r = requested.get(); + if (r == Long.MIN_VALUE) { + break; + } + long u = r + n; + if (u < 0L) { + u = Long.MAX_VALUE; + } + if (requested.compareAndSet(r, u)) { + parent.replay(this); + break; + } + } + } + } + + @Override + public void cancel() { + if (requested.getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) { + parent.remove(this); + } + } + + public boolean isCancelled() { + return requested.get() == Long.MIN_VALUE; + } + } + + static final class EmptySubscription implements Subscription { + @Override + public void request(long n) { + } + @Override + public void cancel() { + } + } +} diff --git a/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorDelayErrorAsProcessorTest.java b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorDelayErrorAsProcessorTest.java new file mode 100644 index 00000000..40ec742f --- /dev/null +++ b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorDelayErrorAsProcessorTest.java @@ -0,0 +1,48 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck; + +import java.util.concurrent.*; + +import org.reactivestreams.*; + +public class ReplayProcessorDelayErrorAsProcessorTest extends IdentityProcessorVerification { + + public ReplayProcessorDelayErrorAsProcessorTest() { + super(new TestEnvironment()); + } + + @Override + public Processor createIdentityProcessor(int bufferSize) { + return new ReplayProcessor(true); + } + + @Override + public Publisher createFailedPublisher() { + return null; + } + + @Override + public ExecutorService publisherExecutorService() { + return Executors.newCachedThreadPool(); + } + + @Override + public Integer createElement(int element) { + return element; + } + + @Override + public boolean strictEventOrdering() { + return true; + } +} diff --git a/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorFailFastAsProcessorTest.java b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorFailFastAsProcessorTest.java new file mode 100644 index 00000000..8a260c7f --- /dev/null +++ b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorFailFastAsProcessorTest.java @@ -0,0 +1,44 @@ +/************************************************************************ + * Licensed under Public Domain (CC0) * + * * + * To the extent possible under law, the person who associated CC0 with * + * this code has waived all copyright and related or neighboring * + * rights to this code. * + * * + * You should have received a copy of the CC0 legalcode along with this * + * work. If not, see .* + ************************************************************************/ + +package org.reactivestreams.tck; + +import java.util.concurrent.*; + +import org.reactivestreams.*; + +public class ReplayProcessorFailFastAsProcessorTest extends IdentityProcessorVerification { + + public ReplayProcessorFailFastAsProcessorTest() { + super(new TestEnvironment()); + } + + @Override + public Processor createIdentityProcessor(int bufferSize) { + return new ReplayProcessor(false); + } + + @Override + public Publisher createFailedPublisher() { + return null; + } + + @Override + public ExecutorService publisherExecutorService() { + return Executors.newCachedThreadPool(); + } + + @Override + public Integer createElement(int element) { + return element; + } + +} From 32027724d2c8e38b87f01154efab6dd7b0670f1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Mon, 3 Apr 2017 17:39:35 +0200 Subject: [PATCH 2/4] Don't consume a non-terminal signal --- .../org/reactivestreams/tck/TestEnvironment.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index 7f1afbdc..58ade05a 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -978,8 +978,18 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru } public boolean tryExpectCompletion(long timeoutMillis) throws InterruptedException { - Optional value = abq.poll(timeoutMillis, TimeUnit.MILLISECONDS); - return value != null && !value.isDefined(); + while (timeoutMillis-- > 0) { + Optional value = abq.peek(); + if (value != null) { + if (!value.isDefined()) { + abq.poll(); + return true; + } + return false; + } + Thread.sleep(1); + } + return false; } @SuppressWarnings("unchecked") From c2c54bc2eb3d0e87a36f7e9191d1b05fc7b2d6dd Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 4 Apr 2017 10:45:12 +0200 Subject: [PATCH 3/4] Don't assume sleep(1) actually sleeps 1 millisecond --- .../main/java/org/reactivestreams/tck/TestEnvironment.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index 58ade05a..e42c8b02 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -978,7 +978,8 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru } public boolean tryExpectCompletion(long timeoutMillis) throws InterruptedException { - while (timeoutMillis-- > 0) { + long end = System.currentTimeMillis() + timeoutMillis; + do { Optional value = abq.peek(); if (value != null) { if (!value.isDefined()) { @@ -988,7 +989,7 @@ public boolean tryExpectCompletion(long timeoutMillis) throws InterruptedExcepti return false; } Thread.sleep(1); - } + } while (System.currentTimeMillis() < end); return false; } From eddb80fbbfc96c7e114af6a0b287cdb895a2008b Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 4 Apr 2017 11:16:59 +0200 Subject: [PATCH 4/4] Use Set with expectAnyNextElement() --- .../reactivestreams/tck/IdentityProcessorVerification.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index 105fb367..7ef86a6c 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -691,7 +691,7 @@ public TestSetup apply(Long subscribers) throws Throwable { // some processors could emit items from the beginning, some may // cache for a limited time or count, therefore // any of the first 3 items may appear after requesing one - expectAnyNextElement(sub2, Arrays.asList(x, y, z)); + expectAnyNextElement(sub2, new HashSet(Arrays.asList(x, y, z))); if (totalRequests == 3) { expectRequest(); @@ -704,7 +704,7 @@ public TestSetup apply(Long subscribers) throws Throwable { // sub2 may not complete because it still has pending y or z if (!sub2.tryExpectCompletion(env.defaultTimeoutMillis())) { sub2.request(1); - expectAnyNextElement(sub2, Arrays.asList(y, z)); + expectAnyNextElement(sub2, new HashSet(Arrays.asList(y, z))); // z may still be pending if (!sub2.tryExpectCompletion(env.defaultTimeoutMillis())) { @@ -774,7 +774,7 @@ public void expectNextElement(ManualSubscriber sub, T expected) throws Interr } } - public void expectAnyNextElement(ManualSubscriber sub, Collection expected) throws InterruptedException { + public void expectAnyNextElement(ManualSubscriber sub, Set expected) throws InterruptedException { final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected)); if (!expected.contains(elem)) { StringBuilder sb = new StringBuilder();