diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java
index 025858c9..7ef86a6c 100644
--- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java
+++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java
@@ -11,6 +11,8 @@
 
 package org.reactivestreams.tck;
 
+import java.util.*;
+
 import org.reactivestreams.Processor;
 import org.reactivestreams.Publisher;
 import org.reactivestreams.Subscriber;
@@ -25,9 +27,6 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import java.util.HashSet;
-import java.util.Set;
-
 public abstract class IdentityProcessorVerification<T> extends WithHelperPublisher<T>
   implements SubscriberWhiteboxVerificationRules, PublisherVerificationRules {
 
@@ -189,6 +188,19 @@ public long maxSupportedSubscribers() {
       return Long.MAX_VALUE;
   }
 
+  /**
+   * Indicates that the tested implementation keeps a strict event ordering in respect to
+   * {@code onNext} and {@code onError}.
+   * Some {@code Processor} implementation may emit all {@code onNext}s received before emitting 
+   * any {@code onError}, similar to how {@code onComplete} is usually emitted after all
+   * previous {@code onNext} items have been emitted. The default implementation returns false,
+   * indicating that {@code onError} may cut ahead and get emitted even if there are
+   * {@code onNext} events ready for consumption (via {@code request()}) by the {@code Subscriber}.
+   */
+  public boolean strictEventOrdering() {
+      return false;
+  }
+  
   ////////////////////// TEST ENV CLEANUP /////////////////////////////////////
 
   @BeforeMethod
@@ -417,6 +429,14 @@ public TestSetup apply(Long aLong) throws Throwable {
           final Exception ex = new RuntimeException("Test exception");
           sendError(ex);
           sub1.expectError(ex);
+          
+          // some Processors may only emit the terminal error if
+          // all previously submitted onNext value has been
+          // consumed by the Subscriber
+          if (strictEventOrdering()) {
+              sub2.request(1);
+              expectNextElement(sub2, x);
+          }
           sub2.expectError(ex);
 
           env.verifyNoAsyncErrorsNoDelay();
@@ -668,7 +688,10 @@ public TestSetup apply(Long subscribers) throws Throwable {
           sub2.expectNone(); // since sub2 hasn't requested anything yet
 
           sub2.request(1);
-          expectNextElement(sub2, z);
+          // some processors could emit items from the beginning, some may
+          // cache for a limited time or count, therefore
+          // any of the first 3 items may appear after requesing one
+          expectAnyNextElement(sub2, new HashSet<T>(Arrays.asList(x, y, z)));
 
           if (totalRequests == 3) {
             expectRequest();
@@ -677,7 +700,20 @@ public TestSetup apply(Long subscribers) throws Throwable {
           // to avoid error messages during test harness shutdown
           sendCompletion();
           sub1.expectCompletion(env.defaultTimeoutMillis());
-          sub2.expectCompletion(env.defaultTimeoutMillis());
+          
+          // sub2 may not complete because it still has pending y or z
+          if (!sub2.tryExpectCompletion(env.defaultTimeoutMillis())) {
+              sub2.request(1);
+              expectAnyNextElement(sub2, new HashSet<T>(Arrays.asList(y, z)));
+              
+              // z may still be pending
+              if (!sub2.tryExpectCompletion(env.defaultTimeoutMillis())) {
+                  sub2.request(1);
+                  expectNextElement(sub2, z);
+                  // after z, it should get onComplete reasonably quickly
+                  sub2.expectCompletion(env.defaultTimeoutMillis());
+              }
+          }
 
           env.verifyNoAsyncErrorsNoDelay();
         }};
@@ -738,6 +774,20 @@ public void expectNextElement(ManualSubscriber<T> sub, T expected) throws Interr
       }
     }
 
+    public void expectAnyNextElement(ManualSubscriber<T> sub, Set<T> expected) throws InterruptedException {
+      final T elem = sub.nextElement(String.format("timeout while awaiting %s", expected));
+      if (!expected.contains(elem)) {
+        StringBuilder sb = new StringBuilder();
+        for (T t : expected) {
+            if (sb.length() > 0) {
+                sb.append(", ");
+            }
+            sb.append(String.format("`onNext(%s)`", t));
+        }
+        env.flop(String.format("Received `onNext(%s)` on downstream but expected any of %s", elem, sb));
+      }
+    }
+
     public T sendNextTFromUpstream() throws InterruptedException {
       final T x = nextT();
       sendNext(x);
diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
index 6b8456f6..e42c8b02 100644
--- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
+++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
@@ -506,6 +506,10 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
       received.expectCompletion(timeoutMillis, errorMsg);
     }
 
+    public boolean tryExpectCompletion(long timeoutMillis) throws InterruptedException {
+      return received.tryExpectCompletion(timeoutMillis);
+    }
+
     public <E extends Throwable> void expectErrorWithMessage(Class<E> expected, String requiredMessagePart) throws Exception {
       expectErrorWithMessage(expected, requiredMessagePart, env.defaultTimeoutMillis());
     }
@@ -973,6 +977,22 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
       } // else, ok
     }
 
+    public boolean tryExpectCompletion(long timeoutMillis) throws InterruptedException {
+      long end = System.currentTimeMillis() + timeoutMillis;
+      do {
+        Optional<T> value = abq.peek();
+        if (value != null) {
+          if (!value.isDefined()) {
+            abq.poll();
+            return true;
+          }
+          return false;
+        }
+        Thread.sleep(1);
+      } while (System.currentTimeMillis() < end);
+      return false;
+    }
+
     @SuppressWarnings("unchecked")
     public <E extends Throwable> E expectError(Class<E> clazz, long timeoutMillis, String errorMsg) throws Exception {
       Thread.sleep(timeoutMillis);
diff --git a/tck/src/test/java/org/reactivestreams/tck/ReplayProcessor.java b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessor.java
new file mode 100644
index 00000000..ee9a57ec
--- /dev/null
+++ b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessor.java
@@ -0,0 +1,336 @@
+/************************************************************************
+ * Licensed under Public Domain (CC0)                                    *
+ *                                                                       *
+ * To the extent possible under law, the person who associated CC0 with  *
+ * this code has waived all copyright and related or neighboring         *
+ * rights to this code.                                                  *
+ *                                                                       *
+ * You should have received a copy of the CC0 legalcode along with this  *
+ * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
+ ************************************************************************/
+
+package org.reactivestreams.tck;
+
+import java.util.*;
+import java.util.concurrent.atomic.*;
+
+import org.reactivestreams.*;
+
+final class ReplayProcessor<T> implements Processor<T, T> {
+
+    final AtomicReference<Subscription> upstream;
+    
+    final List<T> list;
+
+    final boolean delayError;
+    
+    final AtomicReference<ReplaySubscription<T>[]> subscribers;
+    
+    final AtomicReference<Throwable> error;
+    
+    volatile int size;
+    
+    volatile boolean done;
+    
+    @SuppressWarnings("rawtypes")
+    static final ReplaySubscription[] EMPTY = new ReplaySubscription[0];
+
+    @SuppressWarnings("rawtypes")
+    static final ReplaySubscription[] TERMINATED = new ReplaySubscription[0];
+
+    static final Subscription CANCELLED = new EmptySubscription();
+    
+    ReplayProcessor(boolean delayError) {
+        this.delayError = delayError;
+        this.list = new ArrayList<T>();
+        this.upstream = new AtomicReference<Subscription>();
+        this.subscribers = new AtomicReference<ReplaySubscription<T>[]>(EMPTY);
+        this.error = new AtomicReference<Throwable>();
+    }
+    
+    @Override
+    public void onSubscribe(Subscription s) {
+        if (upstream.compareAndSet(null, s)) {
+            s.request(Long.MAX_VALUE);
+        } else {
+            s.cancel();
+        }
+    }
+
+    @Override
+    public void onNext(T t) {
+        if (t == null) {
+            throw new NullPointerException();
+        }
+        
+        list.add(t);
+        size++;
+        for (ReplaySubscription<T> rs : subscribers.get()) {
+            replay(rs);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onError(Throwable t) {
+        if (t == null) {
+            throw new NullPointerException();
+        }
+        if (error.compareAndSet(null, t)) {
+            done = true;
+            for (ReplaySubscription<T> rs : subscribers.getAndSet(TERMINATED)) {
+                replay(rs);
+            }
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void onComplete() {
+        done = true;
+        for (ReplaySubscription<T> rs : subscribers.getAndSet(TERMINATED)) {
+            replay(rs);
+        }
+    }
+
+    @Override
+    public void subscribe(Subscriber<? super T> s) {
+        ReplaySubscription<T> rs = new ReplaySubscription<T>(s, this);
+        s.onSubscribe(rs);
+        if (add(rs)) {
+            if (rs.isCancelled()) {
+                remove(rs);
+                return;
+            }
+        }
+        replay(rs);
+    }
+
+    public void replay(ReplaySubscription<T> rs) {
+        if (rs.getAndIncrement() != 0) {
+            return;
+        }
+        
+        int missed = 1;
+        Subscriber<? super T> a = rs.actual;
+        List<T> list = this.list;
+        int idx = rs.index;
+        AtomicLong req = rs.requested;
+        
+        for (;;) {
+            
+            long r = req.get();
+            
+            while (idx != r) {
+                if (req.get() == Long.MIN_VALUE) {
+                    return;
+                }
+                
+                if (rs.badRequest) {
+                    a.onError(new IllegalArgumentException("§3.9 violated: positive request amount required"));
+                    return;
+                }
+                
+                boolean d = done;
+                
+                if (d && !delayError) {
+                    Throwable ex = error.get();
+                    if (ex != null) {
+                        a.onError(ex);
+                        return;
+                    }
+                }
+                
+                boolean empty = idx >= size;
+                
+                if (d && empty) {
+                    Throwable ex = error.get();
+                    if (ex != null) {
+                        a.onError(ex);
+                    } else {
+                        a.onComplete();
+                    }
+                    return;
+                }
+                
+                if (empty) {
+                    break;
+                }
+                
+                a.onNext(list.get(idx++));
+            }
+            
+            if (idx == r) {
+                if (req.get() == Long.MIN_VALUE) {
+                    return;
+                }
+                
+                if (rs.badRequest) {
+                    a.onError(new IllegalArgumentException("§3.9 violated: positive request amount required"));
+                    return;
+                }
+                
+                boolean d = done;
+                
+                if (d && !delayError) {
+                    Throwable ex = error.get();
+                    if (ex != null) {
+                        a.onError(ex);
+                        return;
+                    }
+                }
+                
+                boolean empty = idx >= size;
+                if (d && empty) {
+                    Throwable ex = error.get();
+                    if (ex != null) {
+                        a.onError(ex);
+                    } else {
+                        a.onComplete();
+                    }
+                    return;
+                }
+            }
+            
+            int w = rs.get();
+            if (w == missed) {
+                rs.index = idx;
+                missed = rs.addAndGet(-missed);
+                if (missed == 0) {
+                    break;
+                }
+            } else {
+                missed = w;
+            }
+        }
+    }
+    
+    boolean add(ReplaySubscription<T> rs) {
+        for (;;) {
+            ReplaySubscription<T>[] a = subscribers.get();
+            if (a == TERMINATED) {
+                return false;
+            }
+            int n = a.length;
+            @SuppressWarnings("unchecked")
+            ReplaySubscription<T>[] b = new ReplaySubscription[n + 1];
+            System.arraycopy(a, 0, b, 0, n);
+            b[n] = rs;
+            
+            if (subscribers.compareAndSet(a, b)) {
+                return true;
+            }
+        }
+    }
+    
+    void remove(ReplaySubscription<T> rs) {
+        for (;;) {
+            ReplaySubscription<T>[] a = subscribers.get();
+            int n = a.length;
+            if (n == 0) {
+                break;
+            }
+            
+            int j = -1;
+            
+            for (int i = 0; i < n; i++) {
+                if (a[i] == rs) {
+                    j = i;
+                    break;
+                }
+            }
+            
+            if (j < 0) {
+                break;
+            }
+            if (n == 1) {
+                @SuppressWarnings("unchecked")
+                ReplaySubscription<T>[] b = TERMINATED;
+                if (subscribers.compareAndSet(a, b)) {
+                    Subscription s = upstream.getAndSet(CANCELLED);
+                    if (s != null) {
+                        s.cancel();
+                    }
+                    break;
+                }
+            } else {
+                @SuppressWarnings("unchecked")
+                ReplaySubscription<T>[] b = new ReplaySubscription[n - 1];
+                System.arraycopy(a, 0, b, 0, j);
+                System.arraycopy(a, j + 1, b, j, n - j - 1);
+                if (subscribers.compareAndSet(a, b)) {
+                    break;
+                }
+            }
+        }
+    }
+    
+    static final class ReplaySubscription<T>
+    extends AtomicInteger
+    implements Subscription {
+        
+        /** */
+        private static final long serialVersionUID = -3704758100845141134L;
+
+        final ReplayProcessor<T> parent;
+        
+        final Subscriber<? super T> actual;
+        
+        final AtomicLong requested;
+        
+        long emitted;
+        
+        int index;
+        
+        volatile boolean badRequest;
+        
+        ReplaySubscription(Subscriber<? super T> actual, ReplayProcessor<T> parent) {
+            this.actual = actual;
+            this.requested = new AtomicLong();
+            this.parent = parent;
+        }
+
+        @Override
+        public void request(long n) {
+            if (n <= 0L) {
+                badRequest = true;
+                parent.replay(this);
+            } else {
+                for (;;) {
+                    long r = requested.get();
+                    if (r == Long.MIN_VALUE) {
+                        break;
+                    }
+                    long u = r + n;
+                    if (u < 0L) {
+                        u = Long.MAX_VALUE;
+                    }
+                    if (requested.compareAndSet(r, u)) {
+                        parent.replay(this);
+                        break;
+                    }
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            if (requested.getAndSet(Long.MIN_VALUE) != Long.MIN_VALUE) {
+                parent.remove(this);
+            }
+        }
+        
+        public boolean isCancelled() {
+            return requested.get() == Long.MIN_VALUE;
+        }
+    }
+    
+    static final class EmptySubscription implements Subscription {
+        @Override
+        public void request(long n) {
+        }
+        @Override
+        public void cancel() {
+        }
+    }
+}
diff --git a/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorDelayErrorAsProcessorTest.java b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorDelayErrorAsProcessorTest.java
new file mode 100644
index 00000000..40ec742f
--- /dev/null
+++ b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorDelayErrorAsProcessorTest.java
@@ -0,0 +1,48 @@
+/************************************************************************
+ * Licensed under Public Domain (CC0)                                    *
+ *                                                                       *
+ * To the extent possible under law, the person who associated CC0 with  *
+ * this code has waived all copyright and related or neighboring         *
+ * rights to this code.                                                  *
+ *                                                                       *
+ * You should have received a copy of the CC0 legalcode along with this  *
+ * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
+ ************************************************************************/
+
+package org.reactivestreams.tck;
+
+import java.util.concurrent.*;
+
+import org.reactivestreams.*;
+
+public class ReplayProcessorDelayErrorAsProcessorTest extends IdentityProcessorVerification<Integer> {
+
+    public ReplayProcessorDelayErrorAsProcessorTest() {
+        super(new TestEnvironment());
+    }
+
+    @Override
+    public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
+        return new ReplayProcessor<Integer>(true);
+    }
+
+    @Override
+    public Publisher<Integer> createFailedPublisher() {
+        return null;
+    }
+
+    @Override
+    public ExecutorService publisherExecutorService() {
+        return Executors.newCachedThreadPool();
+    }
+
+    @Override
+    public Integer createElement(int element) {
+        return element;
+    }
+
+    @Override
+    public boolean strictEventOrdering() {
+        return true;
+    }
+}
diff --git a/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorFailFastAsProcessorTest.java b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorFailFastAsProcessorTest.java
new file mode 100644
index 00000000..8a260c7f
--- /dev/null
+++ b/tck/src/test/java/org/reactivestreams/tck/ReplayProcessorFailFastAsProcessorTest.java
@@ -0,0 +1,44 @@
+/************************************************************************
+ * Licensed under Public Domain (CC0)                                    *
+ *                                                                       *
+ * To the extent possible under law, the person who associated CC0 with  *
+ * this code has waived all copyright and related or neighboring         *
+ * rights to this code.                                                  *
+ *                                                                       *
+ * You should have received a copy of the CC0 legalcode along with this  *
+ * work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
+ ************************************************************************/
+
+package org.reactivestreams.tck;
+
+import java.util.concurrent.*;
+
+import org.reactivestreams.*;
+
+public class ReplayProcessorFailFastAsProcessorTest extends IdentityProcessorVerification<Integer> {
+
+    public ReplayProcessorFailFastAsProcessorTest() {
+        super(new TestEnvironment());
+    }
+
+    @Override
+    public Processor<Integer, Integer> createIdentityProcessor(int bufferSize) {
+        return new ReplayProcessor<Integer>(false);
+    }
+
+    @Override
+    public Publisher<Integer> createFailedPublisher() {
+        return null;
+    }
+
+    @Override
+    public ExecutorService publisherExecutorService() {
+        return Executors.newCachedThreadPool();
+    }
+
+    @Override
+    public Integer createElement(int element) {
+        return element;
+    }
+
+}