Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 58 additions & 41 deletions reactivex/observable/combinelatest.py
Original file line number Diff line number Diff line change
@@ -1,76 +1,93 @@
from typing import Any, List, Optional, Tuple
from typing import Any, List, Optional, Tuple, TypeVar
import threading

from reactivex import Observable, abc
from reactivex.disposable import CompositeDisposable, SingleAssignmentDisposable

T = TypeVar("T")

def combine_latest_(*sources: Observable[Any]) -> Observable[Tuple[Any, ...]]:
"""Merges the specified observable sequences into one observable
sequence by creating a tuple whenever any of the
observable sequences produces an element.

Examples:
>>> obs = combine_latest(obs1, obs2, obs3)
def combine_latest_(*sources: Observable[T]) -> Observable[Tuple[T, ...]]:
"""
Combine multiple observable sequences into one by emitting a tuple
containing the latest values from each source whenever any source emits.

The resulting observable emits only after all sources have emitted at
least once, and then emits again whenever any individual source produces
a new value. Completion occurs when all sources complete, or an error
is propagated immediately if any source fails.

Args:
*sources: Variable number of observable sources to combine.

Returns:
An observable sequence containing the result of combining
elements of the sources into a tuple.
Observable[Tuple[T, ...]]: An observable that emits tuples with the
most recent values from all provided sources.

Raises:
ValueError: If no observable sources are provided.

Examples:
>>> obs = combine_latest_(obs1, obs2, obs3)
>>> obs.subscribe(print)
"""
if not sources:
raise ValueError("At least one observable source must be provided.")

parent = sources[0]
lock = getattr(parent, "lock", threading.RLock())

def subscribe(
observer: abc.ObserverBase[Any], scheduler: Optional[abc.SchedulerBase] = None
observer: abc.ObserverBase[Any],
scheduler: Optional[abc.SchedulerBase] = None,
) -> CompositeDisposable:

n = len(sources)
has_value = [False] * n
has_value_all = [False]
is_done = [False] * n
values = [None] * n
has_value: List[bool] = [False] * n
is_done: List[bool] = [False] * n
values: List[Optional[T]] = [None] * n
has_value_all = False

def _next(i: Any) -> None:
def _next(i: int) -> None:
nonlocal has_value_all
has_value[i] = True
has_value_all = all(has_value)

if has_value_all[0] or all(has_value):
res = tuple(values)
observer.on_next(res)

elif all([x for j, x in enumerate(is_done) if j != i]):
if has_value_all:
observer.on_next(tuple(values))
elif all(done for j, done in enumerate(is_done) if j != i):
observer.on_completed()

has_value_all[0] = all(has_value)

def done(i: Any) -> None:
def _done(i: int) -> None:
is_done[i] = True
if all(is_done):
observer.on_completed()

subscriptions: List[Optional[SingleAssignmentDisposable]] = [None] * n
subscriptions: List[SingleAssignmentDisposable] = [
SingleAssignmentDisposable() for _ in range(n)
]

def func(i: int) -> None:
subscriptions[i] = SingleAssignmentDisposable()
for i, source in enumerate(sources):
sad = subscriptions[i]

def on_next(x: Any) -> None:
with parent.lock:
values[i] = x
_next(i)
def on_next(x: Any, index=i) -> None:
with lock:
values[index] = x
_next(index)

def on_completed() -> None:
with parent.lock:
done(i)
def on_completed(index=i) -> None:
with lock:
_done(index)

subscription = subscriptions[i]
assert subscription
subscription.disposable = sources[i].subscribe(
on_next, observer.on_error, on_completed, scheduler=scheduler
sad.disposable = source.subscribe(
on_next,
observer.on_error,
on_completed,
scheduler=scheduler,
)

for idx in range(n):
func(idx)
return CompositeDisposable(subscriptions)

return Observable(subscribe)
return Observable(subscribe=subscribe)


__all__ = ["combine_latest_"]