Skip to content

FluxTakeUntilOther does not cancel infinite Main on async companion error #4230

@Sage-Pierce

Description

@Sage-Pierce

When applying source.takeUntilOther(other) to an infinite source (Flux), that infinite source is not canceled if/when the companion other emits an asynchronous error.

Expected Behavior

An infinite Flux source with takeUntilOther applied should be canceled if/when the other companion asynchronously emits an error.

Actual Behavior

Source Flux is not canceled and left dangling.

Steps to Reproduce

The test method FluxTakeUntilOtherTest.otherSignalsError() can be updated to the following to expose the bug:

    @Test
    public void otherSignalsError() {
        AssertSubscriber<Integer> ts = AssertSubscriber.create();
        AtomicBoolean mainCancelled = new AtomicBoolean(false);
        AtomicBoolean otherCancelled = new AtomicBoolean(false);

        Flux<Object> other =
            Flux.error(new RuntimeException("forced " + "failure"))
                .delaySubscription(Duration.ofMillis(1)) // Make other async
                .doOnCancel(() -> otherCancelled.set(true));
        Flux.<Integer>never() // Changed to infinite source
            .doOnCancel(() -> mainCancelled.set(true))
            .takeUntilOther(other)
            .subscribe(ts);

        ts.await() // Await termination
            .assertNoValues()
            .assertNotComplete()
            .assertError(RuntimeException.class)
            .assertErrorMessage("forced failure");

        Assertions.assertThat(mainCancelled).isTrue();
        Assertions.assertThat(otherCancelled).isFalse();
    }

Note that this error does not occur if source is finite and synchronous, nor does it occur if the other companion is finite and synchronous. In the former case, the source can complete before the other emits asynchronously. In the latter case, the error subscription and immediate emission causes subscription to an empty main subscription, and then the main subscription does get cancelled, since the "already subscribed" guard is hit (already referencing a canceled subscription).

Possible Solution

Main should be canceled if/when other emits an error asynchronously.

Your Environment

  • Reactor version(s) used: 3.8.4
  • Other relevant libraries versions (eg. netty, ...): N/A
  • JVM version (java -version): 17.0.16
  • OS and version (eg uname -a): Darwin Kernel Version 25.3.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions