Open
Description
@Override
public void run(BlackboxTestStage stage) throws InterruptedException {
triggerRequest(stage.subProxy().sub());
final long requested = stage.expectRequest();// assuming subscriber wants to consume elements...
final long signalsToEmit = Math.min(requested, 512); // protecting against Subscriber which sends ridiculously large demand
// should cope with up to requested number of elements
for (int i = 0; i < signalsToEmit && sampleIsCancelled(stage, i, 10); i++)
stage.signalNext();
// we complete after `signalsToEmit` (which can be less than `requested`),
// which is legal under https://github.com/reactive-streams/reactive-streams-jvm#1.2
stage.sendCompletion();
}
/**
* In order to allow some "skid" and not check state on each iteration,
* only check {@code stage.isCancelled} every {@code checkInterval}'th iteration.
*/
private boolean sampleIsCancelled(BlackboxTestStage stage, int i, int checkInterval) throws InterruptedException {
if (i % checkInterval == 0) return stage.isCancelled();
else return false;
}
I have a question about the use of the sampleIsCancelled
method in the snippet above. It seems strange that the for
loop uses that method's (naked) return value rather than a negation thereof. As far as I can see in the typical scenario it leads to the loop's body being executed at most once. Was it by design?
Metadata
Metadata
Assignees
Labels
No labels