Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mergeScan drops accumulator with async merging source #6612

Open
imcotton opened this issue Sep 23, 2021 · 12 comments
Open

mergeScan drops accumulator with async merging source #6612

imcotton opened this issue Sep 23, 2021 · 12 comments

Comments

@imcotton
Copy link
Contributor

imcotton commented Sep 23, 2021

Bug Report

Current Behavior

sync

Rx.range(1, 3).pipe(

    Rx.mergeScan((acc, n) =>

        Rx.of([ ...acc, n ])

    , [ 0 ]),

).subscribe(console.log);

// [ 0, 1 ]
// [ 0, 1, 2 ]
// [ 0, 1, 2, 3 ]

async

Rx.range(1, 3).pipe(

    Rx.mergeScan((acc, n) =>

        Rx.of([ ...acc, n ]).pipe(
            Rx.delay(5),
        )

    , [ 0 ]),

).subscribe(console.log);

// [ 0, 1 ]
// [ 0, 2 ]
// [ 0, 3 ]

Expected behavior

Async accumulating as well as synced one.

Reproduction

stackblitz

Environment

  • Runtime: Node v12 v14 v16
  • RxJS version: v7.3.0

Possible Solution

Rx.range(1, 3).pipe(

    Rx.mergeScan((acc, n) =>

        Rx.of([ ...acc, n ]).pipe(
            Rx.delay(5),
        )

    , [ 0 ], 1),  // set concurrent to ONE works but loosing parallelism

).subscribe(console.log);

Additional context/Screenshots

#2737

@voliva
Copy link
Contributor

voliva commented Sep 23, 2021

I think it's as expected. From the docs of mergeScan:

The first parameter of the mergeScan is an accumulator function which is being called every time the source Observable emits a value. mergeScan will subscribe to the value returned by the accumulator function and will emit values to the subscriber emitted by inner Observable.
mergeScan internally keeps the value of the acc parameter: as long as the source Observable emits without inner Observable emitting, the acc will be set to seed. The next time the inner Observable emits a value, mergeScan will internally remember it and it will be passed to the accumulator function as acc parameter the next time source emits.

range(1,3) emits 3 values synchronously. The sequence of events will be:

  • mergeScan will call the accumulator function with acc = seed, n = 1 => nothing gets emitted from the inner observable
  • mergeScan will call it again with acc = seed, n = 2 => nothing gets emitted
  • mergeScan will call it again with acc = seed, n = 3 => nothing gets emitted
  • [synchronous block ends, event loop ticks until delay triggers. Let's assume they come in order]
  • The inner observable emits [0, 1]. The accumulator is internally set to [0, 1] and it emits [0, 1]
  • The inner observable emits [0, 2]. The accumulator is internally set to [0, 2] and it emits [0, 2]
  • The inner observable emits [0, 3]. The accumulator is internally set to [0, 3] and it emits [0, 3]

If you were to change your source to be:

Rx.merge(Rx.range(1,3), Rx.timer(1000))

You will see how then it will emit [0, 3, 1], because when timer emits, mergeScan will cal the accumulator with acc = [0,3], n = 1, and after the delay it will emit [0, 3, 1], setting the internal value to [0, 3, 1] and emitting it

@imcotton
Copy link
Contributor Author

Thanks for the detailed explain of current behavior, however I think it would be unfortunate been deliberately designed to differentiate between sync / async merging target, rather it's an oversight in spec coverage, for example in usage of:

  • mocking (sync Rx.of) v.s. production (async fetch)
  • fresh (async) v.s. cached (sync)

As in your explanation of each round acc = seed, n = 1/2/3, it makes sense for n been static synced, but for acc rather should have been updated from latest resolving result, order-wise mergeScan should behave the same as merge.

@imcotton
Copy link
Contributor Author

In docs:

Observable emitting, the acc will be set to seed. The next time the inner Observable emits a value, mergeScan will internally remember it and it will be passed to the accumulator function as acc parameter the next time source emits.

@voliva
Copy link
Contributor

voliva commented Sep 25, 2021

The issue is not about being sync or async but rather delay.

If you take your original code with range and delay, it follows the steps on my previous response. The problem is that by the time range emits a new value, mergeScan hasn't received the value of the previous inner observable so that it can update acc. It technically just can't do what you are suggesting, as the value hasn't arrived yet.

It's just semantics. The mergeScan operator does exactly what docs say.

Your issue also gets solved in this example:

interval(100).pipe(
  take(3),
  mergeScan((acc, n) =>
    of([...acc, n]).pipe(
      delay(5),
    ),
    [0]
  )
)

And this is all async. In this case, interval is emitting every 100ms, which is enough time for the inner observable to emit the value so that acc gets updated.

@imcotton
Copy link
Contributor Author

imcotton commented Sep 25, 2021

I agree, then it'd became user's duty to maintain it running in order from both up-stream and down-stream (i.e. subsequential), which alternatively just set concurrent to 1 as mentioned in Possible Solution.

Thinking more on this, I realize it's impossible to have parallel mergeScan by its current shape:

mergeScan :: (c -> a -> Obs c)               -> c -> Obs a -> Obs c

Instead you need:

mergeReduce :: (a -> Obs b) -> (c -> b -> c) -> c -> Obs a -> Obs c

At this point it turns out no longer a bug report for mergeScan rather a potential feature request which I'm not sure worth the trouble to pursue in RxJS...

@voliva
Copy link
Contributor

voliva commented Sep 26, 2021

Wouldn't this:

mergeReduce :: (a -> Obs b) -> (c -> b -> c) -> c -> Obs a -> Obs c

Just be:

source$.pipe(
  mergeMap(v => of(v).pipe(delay(5))),
  scan((acc, v) => [...acc, v], [])
)

If that's the case I don't think we need another operator for something that can be composed from two others.

@imcotton
Copy link
Contributor Author

You are right! Who'd thought this mergeScan suppose be doing mergeMap + scan at the first place 😅

If that's the case I don't think we need another operator for something that can be composed from two others.

I have two taken:

  1. Much like concatMap (map + concatAll) and mergeMap (map + mergeAll), in that sense mergeScan has its legitimacy.
  2. Given current mergeScan is limited by its implementation, arguably error-prone, I'm in favor of putting it into deprecation (see Remove mergeScan or provide better documentation for it #2737).

@hgaleh
Copy link
Contributor

hgaleh commented Nov 6, 2021

I think that there are some incorrect presumptions with mergeScan:
1- the accumulator returns an Observable which in turn potentially returns multiple values, not single.
2- Observables are not ment to return exactly in the order they are made:
Maybe [0, 3] returns before [0, 1]
3- there is a race condition on the state
4- accumulator is called without checking that the previous innerSubscription is fulfilled!!

I think that the only solution is that the accumulator be called when the previous value has received.

@hgaleh
Copy link
Contributor

hgaleh commented Nov 6, 2021

Wouldn't this:

mergeReduce :: (a -> Obs b) -> (c -> b -> c) -> c -> Obs a -> Obs c

Just be:

source$.pipe(
  mergeMap(v => of(v).pipe(delay(5))),
  scan((acc, v) => [...acc, v], [])
)

If that's the case I don't think we need another operator for something that can be composed from two others.

The result of mergeMap is needed to start a new Observable though combination of mergeMap and 'scan' can not fullfill the requirement.

@voliva
Copy link
Contributor

voliva commented Nov 6, 2021

I think that there are some incorrect presumptions with mergeScan:
1- the accumulator returns an Observable which in turn potentially returns multiple values, not single.
2- Observables are not ment to return exactly in the order they are made: Maybe [0, 3] returns before [0, 1]
3- there is a race condition on the state
4 - accumulator is called without checking that the previous innerSubscription is fulfilled!!

I think that the only solution is that the accumulator be called when the previous value has received.

@hgaleh myself I'm not sure what's the purpose of mergeScan, as you said I think it's quite easy to have race conditions with it.

But I think switchScan is a bit easier to reason about and removes any posible race condition: If I understood it correctly, when the source emits it will unsubscribe from the inner observable before calling the accumulator function again, so you will only have one inner observable active at a time. This solves points [2,3,4], and point [1] isn't a problem per se.

The result of mergeMap is needed to start a new Observable though combination of mergeMap and 'scan' can not fullfill the requirement.

Yes, I know mergeScan can't be done as a combination of mergeMap + scan, but it's what OP was actually looking for.

@hgaleh
Copy link
Contributor

hgaleh commented Nov 9, 2021

@imcotton
check this test in mergeScan-spec.ts:
mergeScan
should not stop ongoing async projections when source completes
It seems that these kinds of results are intentional.

const result = e1.pipe(mergeScan((acc, x) => of(acc.concat(x)).pipe(delay(t)), [] as string[]));
acc = []; x = 'b'; acc.concat(x) = ['b']; t = 5
acc = []; x = 'c'; acc.concat(x) = ['c']
acc = ['b']; x = 'd'; acc.concat(x) = ['b', 'd']
acc = ['c']; x = 'e'; acc.concat(x) = ['c', 'e']
acc = ['b', 'd']; x = 'f'; acc.concat(x) = ['b', 'd', 'f']
acc = ['c', 'e']; x = 'g'; acc.concat(x) = ['c', 'e', 'g']

@benlesh Dear benlesh, is this behaviour really expected? If so, we should drop this bug! Because I see tests that exactly require this type of result.

@benlesh
Copy link
Member

benlesh commented Feb 17, 2022

What if I told you all that mergeScan was something that was made up by an engineer at Netflix many years ago and was never really clearly defined?

That said. changing this now would constitute a breaking change for sure.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants