Skip to content

Commit c67792a

Browse files
committed
Added subscribeBy(DisposableContainer, [...]) extensions.
1 parent 8396845 commit c67792a

File tree

8 files changed

+839
-39
lines changed

8 files changed

+839
-39
lines changed

build.gradle.kts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ dependencies {
4141
api("io.reactivex.rxjava3:rxjava:3.1.0")
4242
implementation(kotlin("stdlib"))
4343

44-
testImplementation("org.funktionale:funktionale-partials:1.0.0-final")
4544
testImplementation("junit:junit:4.12")
4645
testImplementation("org.mockito:mockito-core:1.10.19")
4746

@@ -60,7 +59,6 @@ val sourcesJar by tasks.creating(Jar::class) {
6059
val dokka by tasks.getting(DokkaTask::class) {
6160
outputFormat = "html"
6261
outputDirectory = "$buildDir/javadoc"
63-
6462
}
6563

6664
//documentation
@@ -145,7 +143,7 @@ bintray {
145143

146144
setPublications(if (isRelease) release else snapshot)
147145

148-
// dryRun = true
146+
// dryRun = true
149147

150148
with(pkg) {
151149
userOrg = "reactivex"

src/main/kotlin/io/reactivex/rxjava3/kotlin/subscribers.kt

Lines changed: 84 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@ import io.reactivex.rxjava3.annotations.CheckReturnValue
66
import io.reactivex.rxjava3.annotations.SchedulerSupport
77
import io.reactivex.rxjava3.core.*
88
import io.reactivex.rxjava3.disposables.Disposable
9+
import io.reactivex.rxjava3.disposables.DisposableContainer
910
import io.reactivex.rxjava3.functions.Action
1011
import io.reactivex.rxjava3.functions.Consumer
1112
import io.reactivex.rxjava3.internal.functions.Functions
1213

13-
1414
private val onNextStub: (Any) -> Unit = {}
1515
private val onErrorStub: (Throwable) -> Unit = {}
1616
private val onCompleteStub: () -> Unit = {}
@@ -33,9 +33,9 @@ private fun (() -> Unit).asOnCompleteAction(): Action {
3333
@CheckReturnValue
3434
@SchedulerSupport(SchedulerSupport.NONE)
3535
fun <T : Any> Observable<T>.subscribeBy(
36-
onError: (Throwable) -> Unit = onErrorStub,
37-
onComplete: () -> Unit = onCompleteStub,
38-
onNext: (T) -> Unit = onNextStub
36+
onError: (Throwable) -> Unit = onErrorStub,
37+
onComplete: () -> Unit = onCompleteStub,
38+
onNext: (T) -> Unit = onNextStub
3939
): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
4040

4141
/**
@@ -45,9 +45,9 @@ fun <T : Any> Observable<T>.subscribeBy(
4545
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
4646
@SchedulerSupport(SchedulerSupport.NONE)
4747
fun <T : Any> Flowable<T>.subscribeBy(
48-
onError: (Throwable) -> Unit = onErrorStub,
49-
onComplete: () -> Unit = onCompleteStub,
50-
onNext: (T) -> Unit = onNextStub
48+
onError: (Throwable) -> Unit = onErrorStub,
49+
onComplete: () -> Unit = onCompleteStub,
50+
onNext: (T) -> Unit = onNextStub
5151
): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
5252

5353
/**
@@ -56,8 +56,8 @@ fun <T : Any> Flowable<T>.subscribeBy(
5656
@CheckReturnValue
5757
@SchedulerSupport(SchedulerSupport.NONE)
5858
fun <T : Any> Single<T>.subscribeBy(
59-
onError: (Throwable) -> Unit = onErrorStub,
60-
onSuccess: (T) -> Unit = onNextStub
59+
onError: (Throwable) -> Unit = onErrorStub,
60+
onSuccess: (T) -> Unit = onNextStub
6161
): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer())
6262

6363
/**
@@ -66,9 +66,9 @@ fun <T : Any> Single<T>.subscribeBy(
6666
@CheckReturnValue
6767
@SchedulerSupport(SchedulerSupport.NONE)
6868
fun <T : Any> Maybe<T>.subscribeBy(
69-
onError: (Throwable) -> Unit = onErrorStub,
70-
onComplete: () -> Unit = onCompleteStub,
71-
onSuccess: (T) -> Unit = onNextStub
69+
onError: (Throwable) -> Unit = onErrorStub,
70+
onComplete: () -> Unit = onCompleteStub,
71+
onSuccess: (T) -> Unit = onNextStub
7272
): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
7373

7474
/**
@@ -77,8 +77,8 @@ fun <T : Any> Maybe<T>.subscribeBy(
7777
@CheckReturnValue
7878
@SchedulerSupport(SchedulerSupport.NONE)
7979
fun Completable.subscribeBy(
80-
onError: (Throwable) -> Unit = onErrorStub,
81-
onComplete: () -> Unit = onCompleteStub
80+
onError: (Throwable) -> Unit = onErrorStub,
81+
onComplete: () -> Unit = onCompleteStub
8282
): Disposable = when {
8383
// There are optimized versions of the completable Consumers, so we need to use the subscribe overloads
8484
// here.
@@ -87,14 +87,69 @@ fun Completable.subscribeBy(
8787
else -> subscribe(onComplete.asOnCompleteAction(), Consumer(onError))
8888
}
8989

90+
/**
91+
* Overloaded subscribe function that allows passing named parameters
92+
*/
93+
@SchedulerSupport(SchedulerSupport.NONE)
94+
fun <T : Any> Observable<T>.subscribeBy(
95+
container: DisposableContainer,
96+
onError: (Throwable) -> Unit = onErrorStub,
97+
onComplete: () -> Unit = onCompleteStub,
98+
onNext: (T) -> Unit = onNextStub
99+
): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container)
100+
101+
/**
102+
* Overloaded subscribe function that allows passing named parameters
103+
*/
104+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
105+
@SchedulerSupport(SchedulerSupport.NONE)
106+
fun <T : Any> Flowable<T>.subscribeBy(
107+
container: DisposableContainer,
108+
onError: (Throwable) -> Unit = onErrorStub,
109+
onComplete: () -> Unit = onCompleteStub,
110+
onNext: (T) -> Unit = onNextStub
111+
): Disposable = subscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container)
112+
113+
/**
114+
* Overloaded subscribe function that allows passing named parameters
115+
*/
116+
@SchedulerSupport(SchedulerSupport.NONE)
117+
fun <T : Any> Single<T>.subscribeBy(
118+
container: DisposableContainer,
119+
onError: (Throwable) -> Unit = onErrorStub,
120+
onSuccess: (T) -> Unit = onNextStub
121+
): Disposable = subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), container)
122+
123+
/**
124+
* Overloaded subscribe function that allows passing named parameters
125+
*/
126+
@SchedulerSupport(SchedulerSupport.NONE)
127+
fun <T : Any> Maybe<T>.subscribeBy(
128+
container: DisposableContainer,
129+
onError: (Throwable) -> Unit = onErrorStub,
130+
onComplete: () -> Unit = onCompleteStub,
131+
onSuccess: (T) -> Unit = onNextStub
132+
): Disposable =
133+
subscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction(), container)
134+
135+
/**
136+
* Overloaded subscribe function that allows passing named parameters
137+
*/
138+
@SchedulerSupport(SchedulerSupport.NONE)
139+
fun Completable.subscribeBy(
140+
container: DisposableContainer,
141+
onError: (Throwable) -> Unit = onErrorStub,
142+
onComplete: () -> Unit = onCompleteStub
143+
): Disposable = subscribe(onComplete.asOnCompleteAction(), onError.asOnErrorConsumer(), container)
144+
90145
/**
91146
* Overloaded blockingSubscribe function that allows passing named parameters
92147
*/
93148
@SchedulerSupport(SchedulerSupport.NONE)
94149
fun <T : Any> Observable<T>.blockingSubscribeBy(
95-
onError: (Throwable) -> Unit = onErrorStub,
96-
onComplete: () -> Unit = onCompleteStub,
97-
onNext: (T) -> Unit = onNextStub
150+
onError: (Throwable) -> Unit = onErrorStub,
151+
onComplete: () -> Unit = onCompleteStub,
152+
onNext: (T) -> Unit = onNextStub
98153
): Unit = blockingSubscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
99154

100155
/**
@@ -103,35 +158,35 @@ fun <T : Any> Observable<T>.blockingSubscribeBy(
103158
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
104159
@SchedulerSupport(SchedulerSupport.NONE)
105160
fun <T : Any> Flowable<T>.blockingSubscribeBy(
106-
onError: (Throwable) -> Unit = onErrorStub,
107-
onComplete: () -> Unit = onCompleteStub,
108-
onNext: (T) -> Unit = onNextStub
161+
onError: (Throwable) -> Unit = onErrorStub,
162+
onComplete: () -> Unit = onCompleteStub,
163+
onNext: (T) -> Unit = onNextStub
109164
): Unit = blockingSubscribe(onNext.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
110165

111166
/**
112167
* Overloaded blockingSubscribe function that allows passing named parameters
113168
*/
114169
@SchedulerSupport(SchedulerSupport.NONE)
115170
fun <T : Any> Maybe<T>.blockingSubscribeBy(
116-
onError: (Throwable) -> Unit = onErrorStub,
117-
onComplete: () -> Unit = onCompleteStub,
118-
onSuccess: (T) -> Unit = onNextStub
119-
) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
171+
onError: (Throwable) -> Unit = onErrorStub,
172+
onComplete: () -> Unit = onCompleteStub,
173+
onSuccess: (T) -> Unit = onNextStub
174+
): Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer(), onComplete.asOnCompleteAction())
120175

121176
/**
122177
* Overloaded blockingSubscribe function that allows passing named parameters
123178
*/
124179
@SchedulerSupport(SchedulerSupport.NONE)
125180
fun <T : Any> Single<T>.blockingSubscribeBy(
126-
onError: (Throwable) -> Unit = onErrorStub,
127-
onSuccess: (T) -> Unit = onNextStub
128-
) : Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer())
181+
onError: (Throwable) -> Unit = onErrorStub,
182+
onSuccess: (T) -> Unit = onNextStub
183+
): Unit = blockingSubscribe(onSuccess.asConsumer(), onError.asOnErrorConsumer())
129184

130185
/**
131186
* Overloaded blockingSubscribe function that allows passing named parameters
132187
*/
133188
@SchedulerSupport(SchedulerSupport.NONE)
134189
fun Completable.blockingSubscribeBy(
135-
onError: (Throwable) -> Unit = onErrorStub,
136-
onComplete: () -> Unit = onCompleteStub
190+
onError: (Throwable) -> Unit = onErrorStub,
191+
onComplete: () -> Unit = onCompleteStub
137192
): Unit = blockingSubscribe(onComplete.asOnCompleteAction(), onError.asOnErrorConsumer())
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
/*
2+
* Copyright (c) 2021-present, RxKotlin Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.reactivex.rxjava3.kotlin
18+
19+
import io.reactivex.rxjava3.disposables.CompositeDisposable
20+
import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection
21+
import io.reactivex.rxjava3.subjects.CompletableSubject
22+
import org.junit.Assert.*
23+
import org.junit.Test
24+
import java.io.IOException
25+
26+
class CompletableConsumersTest {
27+
28+
private fun CompositeDisposable.isEmpty(): Boolean = size() == 0
29+
private fun CompositeDisposable.isNotEmpty(): Boolean = size() > 0
30+
31+
private val disposables = CompositeDisposable()
32+
private val subject = CompletableSubject.create()
33+
private val events = mutableListOf<Any>()
34+
35+
@Test
36+
fun errorIntrospectionNormal() {
37+
val disposable = subject.subscribeBy(disposables) as LambdaConsumerIntrospection
38+
assertFalse(disposable.hasCustomOnError())
39+
}
40+
41+
@Test
42+
fun errorIntrospectionCustom() {
43+
val disposable = subject.subscribeBy(disposables, onError = {}) as LambdaConsumerIntrospection
44+
assertTrue(disposable.hasCustomOnError())
45+
}
46+
47+
@Test
48+
fun onErrorNormal() {
49+
subject.subscribeBy(
50+
disposables,
51+
onError = events::add
52+
)
53+
54+
assertTrue(disposables.isNotEmpty())
55+
assertTrue(events.isEmpty())
56+
57+
subject.onComplete()
58+
59+
assertTrue(disposables.isEmpty())
60+
assertTrue(events.isEmpty())
61+
}
62+
63+
@Test
64+
fun onErrorError() {
65+
subject.subscribeBy(
66+
disposables,
67+
onError = events::add
68+
)
69+
70+
assertTrue(disposables.isNotEmpty())
71+
assertTrue(events.isEmpty())
72+
73+
subject.onError(IOException())
74+
75+
assertTrue(disposables.isEmpty())
76+
assertEquals(1, events.size)
77+
assertTrue(events[0] is IOException)
78+
}
79+
80+
@Test
81+
fun onCompleteNormal() {
82+
subject.subscribeBy(
83+
disposables,
84+
onError = events::add,
85+
onComplete = { events.add("completed") }
86+
)
87+
88+
assertTrue(disposables.isNotEmpty())
89+
assertTrue(events.isEmpty())
90+
91+
subject.onComplete()
92+
93+
assertTrue(disposables.isEmpty())
94+
assertEquals(listOf("completed"), events)
95+
}
96+
97+
@Test
98+
fun onCompleteError() {
99+
subject.subscribeBy(
100+
disposables,
101+
onError = events::add,
102+
onComplete = { events.add("completed") }
103+
)
104+
105+
assertTrue(disposables.isNotEmpty())
106+
assertTrue(events.isEmpty())
107+
108+
subject.onError(IOException())
109+
110+
assertTrue(disposables.isEmpty())
111+
assertEquals(1, events.size)
112+
assertTrue(events[0] is IOException)
113+
}
114+
115+
@Test
116+
fun onCompleteDispose() {
117+
val disposable = subject.subscribeBy(
118+
disposables,
119+
onError = events::add,
120+
onComplete = { events.add("completed") }
121+
)
122+
123+
assertFalse(disposable.isDisposed)
124+
assertTrue(disposables.isNotEmpty())
125+
assertTrue(events.isEmpty())
126+
127+
disposable.dispose()
128+
129+
assertTrue(disposable.isDisposed)
130+
assertTrue(disposables.isEmpty())
131+
assertTrue(events.isEmpty())
132+
}
133+
}

src/test/kotlin/io/reactivex/rxjava3/kotlin/ExtensionTests.kt

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import io.reactivex.rxjava3.core.Observable
2121
import io.reactivex.rxjava3.core.ObservableEmitter
2222
import io.reactivex.rxjava3.schedulers.TestScheduler
2323
import io.reactivex.rxjava3.functions.*
24-
import org.funktionale.partials.invoke
2524
import org.junit.Assert.assertEquals
2625
import org.junit.Assert.fail
2726
import org.junit.Test
@@ -248,11 +247,6 @@ class ExtensionTests : KotlinTests() {
248247
inOrder.verifyNoMoreInteractions()
249248
}
250249

251-
val funOnSubscribe: (Int, ObservableEmitter<in String>) -> Unit = { counter, subscriber ->
252-
subscriber.onNext("hello_$counter")
253-
subscriber.onComplete()
254-
}
255-
256250
val asyncObservable: (ObservableEmitter<in Int>) -> Unit = { subscriber ->
257251
thread {
258252
Thread.sleep(50)
@@ -270,7 +264,11 @@ class ExtensionTests : KotlinTests() {
270264
get() = listOf(1, 3, 2, 5, 4).toObservable()
271265

272266
val onSubscribe: (ObservableEmitter<in String>) -> Unit
273-
get() = funOnSubscribe(p1 = counter++) // partial applied function
267+
get() = {
268+
it.onNext("hello_$counter")
269+
it.onComplete()
270+
counter ++
271+
}
274272

275273
val observable: Observable<String>
276274
get() = Observable.create(onSubscribe)

0 commit comments

Comments
 (0)