From d079c416e40c9d60bdf4cc5c0e275ab2237109ce Mon Sep 17 00:00:00 2001 From: ferologics Date: Wed, 9 Jun 2021 20:44:28 +0200 Subject: [PATCH 1/3] =?UTF-8?q?=F0=9F=86=95=20Add=20a=20'collect(until:)'?= =?UTF-8?q?=20operator?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Sources/Operators/CollectUntilTrigger.swift | 51 +++++++++++++++++++++ Tests/CollectUntilTriggerTests.swift | 42 +++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 Sources/Operators/CollectUntilTrigger.swift create mode 100644 Tests/CollectUntilTriggerTests.swift diff --git a/Sources/Operators/CollectUntilTrigger.swift b/Sources/Operators/CollectUntilTrigger.swift new file mode 100644 index 0000000..6675fb0 --- /dev/null +++ b/Sources/Operators/CollectUntilTrigger.swift @@ -0,0 +1,51 @@ +// +// CollectUntilTrigger.swift +// CombineExt +// +// Created by ferologics on 09/06/2021. +// Copyright © 2021 Combine Community. All rights reserved. +// + +#if canImport(Combine) +import Combine + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publisher { + func collect( + until trigger: CollectionTrigger + ) -> AnyPublisher<[Output], Failure> where + CollectionTrigger: Publisher, + CollectionTrigger.Output == Void, + CollectionTrigger.Failure == Never { + var events = [Output]() + + let eventPublisher = PassthroughSubject<[Output], Failure>() + var cancellables = [AnyCancellable]() + + self.sink { completion in + eventPublisher.send(completion: completion) + } receiveValue: { output in + events.append(output) + } + .store(in: &cancellables) + + trigger.sink { _ in + eventPublisher.send(events) + events = [] + } + .store(in: &cancellables) + + func cleanUp() { + cancellables.forEach { $0.cancel() } + cancellables = [] + } + + return eventPublisher + .handleEvents( + receiveCompletion: { _ in cleanUp() }, + receiveCancel: { cleanUp() } + ) + .eraseToAnyPublisher() + } +} +#endif diff --git a/Tests/CollectUntilTriggerTests.swift b/Tests/CollectUntilTriggerTests.swift new file mode 100644 index 0000000..fdcd921 --- /dev/null +++ b/Tests/CollectUntilTriggerTests.swift @@ -0,0 +1,42 @@ +// +// CollectUntilTriggerTests.swift +// CombineExtTests +// +// Created by ferologics on 09/06/2021. +// Copyright © 2020 Combine Community. All rights reserved. +// + +#if !os(watchOS) +import XCTest +import Combine + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +class CollectUntilTriggerTests: XCTestCase { + var subscription: AnyCancellable! + + func test() { + // Given + let elements = [1,2,3,4,5] + var receivedElements = [Int]() + let elementsPublisher = PassthroughSubject() + let trigger = PassthroughSubject() + + // When + subscription = elementsPublisher + .collect(until: trigger) + .sink { receivedElements = $0 } + + for x in elements { + elementsPublisher.send(x) + } + + // Then + XCTAssertTrue(receivedElements.isEmpty) + trigger.send(()) + XCTAssertEqual(elements.count, receivedElements.count) + for (a, b) in zip(elements, receivedElements) { + XCTAssertEqual(a, b) + } + } +} +#endif From aee68cf700a0dfbe9bfb0033c407c1070ad1231b Mon Sep 17 00:00:00 2001 From: ferologics Date: Mon, 1 Nov 2021 10:09:17 +0100 Subject: [PATCH 2/3] =?UTF-8?q?=E2=9C=8F=EF=B8=8F=20Fix=20a=20typo?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Sources/Operators/Dematerialize.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Operators/Dematerialize.swift b/Sources/Operators/Dematerialize.swift index 663f776..eabee4f 100644 --- a/Sources/Operators/Dematerialize.swift +++ b/Sources/Operators/Dematerialize.swift @@ -40,7 +40,7 @@ public extension Publishers { } } -// MARK: - Subscrription +// MARK: - Subscription @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) private extension Publishers.Dematerialize { class Subscription: Combine.Subscription From 47de41b12f115671ff02b66725c8f56e63593a2d Mon Sep 17 00:00:00 2001 From: ferologics Date: Mon, 1 Nov 2021 10:09:57 +0100 Subject: [PATCH 3/3] =?UTF-8?q?=E2=9C=A8=20Add=20'collect(until:)'=20opera?= =?UTF-8?q?tor=20(rewrite)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Sources/Operators/CollectUntilTrigger.swift | 157 +++++++++++++++----- 1 file changed, 122 insertions(+), 35 deletions(-) diff --git a/Sources/Operators/CollectUntilTrigger.swift b/Sources/Operators/CollectUntilTrigger.swift index 6675fb0..d359349 100644 --- a/Sources/Operators/CollectUntilTrigger.swift +++ b/Sources/Operators/CollectUntilTrigger.swift @@ -11,41 +11,128 @@ import Combine @available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) public extension Publisher { - func collect( - until trigger: CollectionTrigger - ) -> AnyPublisher<[Output], Failure> where - CollectionTrigger: Publisher, - CollectionTrigger.Output == Void, - CollectionTrigger.Failure == Never { - var events = [Output]() - - let eventPublisher = PassthroughSubject<[Output], Failure>() - var cancellables = [AnyCancellable]() - - self.sink { completion in - eventPublisher.send(completion: completion) - } receiveValue: { output in - events.append(output) - } - .store(in: &cancellables) - - trigger.sink { _ in - eventPublisher.send(events) - events = [] - } - .store(in: &cancellables) - - func cleanUp() { - cancellables.forEach { $0.cancel() } - cancellables = [] - } - - return eventPublisher - .handleEvents( - receiveCompletion: { _ in cleanUp() }, - receiveCancel: { cleanUp() } - ) - .eraseToAnyPublisher() + func collect( + until trigger: Trigger + ) -> Publishers.CollectUntilTrigger where + Trigger.Output == Void, + Trigger.Failure == Never + { + Publishers.CollectUntilTrigger( + upstream: self, + trigger: trigger + ) + } +} + +// MARK: - Publisher + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public extension Publishers { + struct CollectUntilTrigger< + Upstream: Publisher, + Trigger: Publisher + >: Publisher where + Trigger.Output == Void, + Trigger.Failure == Never + { + public typealias Output = [Upstream.Output] + public typealias Failure = Upstream.Failure + + private let upstream: Upstream + private let trigger: Trigger + + init(upstream: Upstream, trigger: Trigger) { + self.upstream = upstream + self.trigger = trigger + } + + public func receive(subscriber: S) + where Failure == S.Failure, Output == S.Input + { + subscriber.receive( + subscription: Subscription( + upstream: upstream, + downstream: subscriber, + trigger: trigger + ) + ) } + } } + +// MARK: - Subscription + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private extension Publishers.CollectUntilTrigger { + final class Subscription< + Downstream: Subscriber + >: Combine.Subscription where + Downstream.Input == [Upstream.Output], + Downstream.Failure == Upstream.Failure + { + private var sink: Sink? + private var cancellable: Cancellable? + + init( + upstream: Upstream, + downstream: Downstream, + trigger: Trigger + ) { + self.sink = Sink( + upstream: upstream, + downstream: downstream + ) + + cancellable = trigger.sink { [self] in + _ = sink?.buffer.buffer(value: sink?.elements ?? []) + _ = sink?.buffer.demand(.max(1)) + sink?.flush() + } + } + + func request(_ demand: Subscribers.Demand) { + sink?.demand(demand) + } + + func cancel() { + sink = nil + cancellable?.cancel() + cancellable = nil + } + + var description: String { + return "CollectUntilTrigger.Subscription<\(Downstream.Input.self), \(Downstream.Failure.self)>" + } + } +} + +// MARK: - Sink + +@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +private extension Publishers.CollectUntilTrigger { + final class Sink< + Downstream: Subscriber + >: CombineExt.Sink where + Downstream.Input == [Upstream.Output], + Downstream.Failure == Upstream.Failure + { + private let lock = NSRecursiveLock() + var elements: [Upstream.Output] = [] + + override func receive(_ input: Upstream.Output) -> Subscribers.Demand { + lock.lock() + defer { lock.unlock() } + elements.append(input) + return .none + } + + func flush() { + lock.lock() + defer { lock.unlock() } + elements = [] + } + } +} + #endif +