Skip to content

Commit fc3e08b

Browse files
Merge pull request #503 from Applied-Duality/scala-bindings
New Scala Bindings
2 parents 30b6b08 + 7fd5183 commit fc3e08b

33 files changed

+1606
-1620
lines changed
Lines changed: 128 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -1,101 +1,144 @@
1-
# Scala Adaptor for RxJava
2-
3-
This adaptor allows to use RxJava in Scala with anonymous functions, e.g.
1+
Alternative Rx bindings for Scala
2+
=================================
43

4+
The current RxScala binding attempt to optimize for seamless interop between Scala and Java.
5+
The intended interop is illustrated by the following example where in Scala a class is defined that takes
6+
an `Observable[Movie]` that is transformed using RxScala operators:
57
```scala
6-
val o = Observable.interval(200 millis).take(5)
7-
o.subscribe(n => println("n = " + n))
8-
Observable(1, 2, 3, 4).reduce(_ + _)
8+
class MovieLib(val moviesStream: Observable[Movie]) {
9+
val threshold = 1200
10+
def shortMovies: Observable[Movie] = ???
11+
def longMovies: Observable[Movie] = ???
12+
}
913
```
14+
which is then called in Java, passing a Java `Observable<Movie>` to the constructor
15+
```java
16+
public void test() {
17+
MovieLib lib = new MovieLib(Observable.from(...));
1018

11-
For-comprehensions are also supported:
12-
13-
```scala
14-
val first = Observable(10, 11, 12)
15-
val second = Observable(10, 11, 12)
16-
val booleans = for ((n1, n2) <- (first zip second)) yield (n1 == n2)
19+
lib.longMovies().subscribe(moviePrinter);
20+
}
1721
```
18-
19-
Further, this adaptor attempts to expose an API which is as Scala-idiomatic as possible. This means that certain methods have been renamed, their signature was changed, or static methods were changed to instance methods. Some examples:
20-
22+
The technique used to obtain this transparency is to use a value class with a private constructor that implements
23+
the Rx operators in an idiomatic Scala way, and a companion object that is used to construct instances in Scala
2124
```scala
22-
// instead of concat:
23-
def ++[U >: T](that: Observable[U]): Observable[U]
24-
25-
// instance method instead of static:
26-
def zip[U](that: Observable[U]): Observable[(T, U)]
27-
28-
// the implicit evidence argument ensures that dematerialize can only be called on Observables of Notifications:
29-
def dematerialize[U](implicit evidence: T <:< Notification[U]): Observable[U]
30-
31-
// additional type parameter U with lower bound to get covariance right:
32-
def onErrorResumeNext[U >: T](resumeFunction: Throwable => Observable[U]): Observable[U]
25+
object Observable {
26+
def apply[T](asJava: rx.Observable[_ <: T]): Observable[T] = { new Observable[T](asJava) }
27+
}
3328

34-
// curried in Scala collections, so curry fold also here:
35-
def fold[R](initialValue: R)(accumulator: (R, T) => R): Observable[R]
36-
37-
// using Duration instead of (long timepan, TimeUnit duration):
38-
def sample(duration: Duration): Observable[T]
39-
40-
// called skip in Java, but drop in Scala
41-
def drop(n: Int): Observable[T]
42-
43-
// there's only mapWithIndex in Java, because Java doesn't have tuples:
44-
def zipWithIndex: Observable[(T, Int)]
45-
46-
// corresponds to Java's toList:
47-
def toSeq: Observable[Seq[T]]
48-
49-
// the implicit evidence argument ensures that switch can only be called on Observables of Observables:
50-
def switch[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U]
51-
52-
// Java's from becomes apply, and we use Scala Range
53-
def apply(range: Range): Observable[Int]
54-
55-
// use Bottom type:
56-
def never: Observable[Nothing]
29+
class Observable[+T] private[scala] (val asJava: rx.Observable[_ <: T]) extends AnyVal {
30+
// Idiomatic Scala friendly definitions of Rx operators
31+
}
5732
```
58-
59-
Also, the Scala Observable is fully covariant in its type parameter, whereas the Java Observable only achieves partial covariance due to limitations of Java's type system (or if you can fix this, your suggestions are very welcome).
60-
61-
For more examples, see [RxScalaDemo.scala](https://github.com/Netflix/RxJava/blob/master/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala).
62-
63-
Scala code using Rx should only import members from `rx.lang.scala` and below.
64-
65-
66-
## Documentation
67-
68-
The API documentation can be found [here](http://rxscala.github.io/scaladoc/index.html#rx.lang.scala.Observable).
69-
70-
You can build the API documentation yourself by running `./gradlew scaladoc` in the RxJava root directory.
71-
72-
Then navigate to `RxJava/language-adaptors/rxjava-scala/build/docs/scaladoc/index.html` to display it.
73-
74-
75-
## Binaries
76-
77-
Binaries and dependency information for Maven, Ivy, Gradle and others can be found at [http://search.maven.org](http://search.maven.org/#search%7Cga%7C1%7Ca%3A%22rxjava-scala%22).
78-
79-
Example for Maven:
80-
81-
```xml
82-
<dependency>
83-
<groupId>com.netflix.rxjava</groupId>
84-
<artifactId>rxjava-scala</artifactId>
85-
<version>x.y.z</version>
86-
</dependency>
33+
Since `rx.lang.scala.Observable[T] extends AnyVal`, the underlying representation of `rx.lang.scala.Observable[T]`
34+
is the same as `rx.Observable<T>`. Because `rx.lang.scala.Observable[T]` is an opaque type in Scala,
35+
the Scala programmer only sees the Scala-friendly operators.
36+
37+
However, in the current the illusion of interop is quickly lost when going beyond this simple example.
38+
For example but type `Notification[T]` and `Scheduler[T]` are defined using wrappers,
39+
and hence they are not compatible with `Notification<T>` respectively `Scheduler<T>`.
40+
For instance, when materializing an `Observable[T]` in Scala to an `Observable[Notification[T]]`,
41+
we lost the seamless interop with `Observable<Notification<T>>` on the Java side.
42+
43+
However, the real problems with seamless interop show up when we try to creating bindings for other Rx types.
44+
In particular types that have inheritance or more structure.
45+
46+
For example, RxScala currently defines a type synonym `type Observer[-T] = rx.Observer[_ >: T]`,
47+
but no further bindings for observers.
48+
Similarly, for subjects RxScala defines `type Subject[-T, +R] = rx.subjects.Subject[_ >: T, _ <: R]`.
49+
The problem with these definitions is that on the Java side, subjects are defined as:
50+
```scala
51+
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> { …}
8752
```
53+
without binding any of the Rx subjects.
54+
55+
The consequence is that `Subject[S,T]` in Scala is unrelated to `rx.lang.scala.Observable[T]` in Scala,
56+
but shows up as a `rx.Observable[T]`. The problem however is that if we want to expose subjects in Scala
57+
such that they derive from both `Observable[S]` and `Observer[T]` we cannot use the `extend AnyVal` trick
58+
we used for `Observable[T]` and immediately lose transparent interop with Java.
59+
60+
The problem is even worse because `AsyncSubject<T>`, `BehaviorSubject<T>`, … all derive from `Subject<T,T>`,
61+
so if we want them to derive from a common base `Subject[T,T]` type in Scala we lose transparency for those as well.
62+
And again, if we expose the various subjects by extending `AnyVal`, they are useless in Scala because they do not inherit
63+
from a common base type. To avoid implementing all methods of observable and observer on each specific subject
64+
we might add implicit conversions to `Observable[T]` and `Observer[T]` but that still does not give Scala users
65+
a native `Subject[S,T]` type.
66+
```scala
67+
object AsyncSubject {
68+
def apply[T](): AsyncSubject[T] =
69+
new AsyncSubject[T](rx.subjects.AsyncSubject.create())
70+
}
8871

89-
and for Ivy:
72+
class AsyncSubject[T] private [scala] (val inner: rx.subjects.AsyncSubject[T])
73+
extends AnyVal
74+
{ … }
9075

91-
```xml
92-
<dependency org="com.netflix.rxjava" name="rxjava-scala" rev="x.y.z" />
76+
implicit final def asObservable[T](subject: AsyncSubject[T]): Observable[T] =
77+
Observable(subject.inner)
78+
79+
implicit final def asObserver[T](subject: AsyncSubject[T]): Observer[T] =
80+
subject.inner
9381
```
82+
The inheritance problem is not just limited to subjects, but also surfaces for subscriptions.
83+
Rx scala currently defines `type Subscription = rx.Subscription` using a type synonym as well,
84+
and we run into exactly the same problems as with subjects when we try to bind the
85+
various Rx subscriptions `BooleanSubscription`, `SerialSubscription`, etc.
9486

95-
and for sbt:
87+
Since we cannot wrap Rx types in Scala such that they are both (a) transparently interoperable with Java,
88+
and (b) feel native and idiomatic to Scala, we should decide in favor of optimizing RxScala for Scala
89+
and consumption of Rx values from Java but not for Scala as a producer.
9690

91+
If we take that approach, we can make bindings that feels like a completely native Scala library,
92+
without needing any complications of the Scala side.
9793
```scala
98-
libraryDependencies ++= Seq(
99-
"com.netflix.rxjava" % "rxjava-scala" % "x.y.z"
100-
)
94+
object Observer { …}
95+
trait Observable[+T] {
96+
def asJavaObservable: rx.Observable[_ <: T]
97+
}
98+
99+
object Observer {…}
100+
trait Observer[-T] {
101+
def asJavaObserver: rx.Observer[_ >: T]
102+
}
103+
104+
object Subject {…}
105+
trait Subject[-T, +R] extends Observable[R] with Observer[T] {
106+
val asJavaSubject: rx.subjects.Subject[_ >: T, _<: R]
107+
}
108+
109+
object Scheduler {…}
110+
trait Scheduler {
111+
def asJavaScheduler: rx.Scheduler;
112+
}
113+
114+
object Notification {…}
115+
trait Notification[+T] {
116+
def asJavaNotification: rx.Notification[_ <: T]
117+
}
118+
119+
object Subscription {…}
120+
trait Subscription {
121+
def asJavaSubscription: rx.Subscription
122+
}
123+
```
124+
You pay the price when crossing the Scala/Java interop boundary, which is where it should be.
125+
The proper way is to put the burden of interop on the Scala side, in case you want to create
126+
a reusable Rx-based library in Scala, or wrap and unwrap on the Java side.
127+
```java
128+
public static void main(String[] args) {
129+
130+
Observable<Movie> movies = Observable.from(new Movie(3000), new Movie(1000), new Movie(2000));
131+
MovieLib lib = new MovieLib(toScalaObservable(movies));
132+
lib.longMovies().asJavaObservable().subscribe(m ->
133+
System.out.println("A movie of length " + m.lengthInSeconds() + "s")
134+
);
135+
}
101136
```
137+
Delegation versus Inheritance
138+
-----------------------------
139+
The obvious thought is that using delegation instead of inheritance (http://c2.com/cgi/wiki?DelegationIsInheritance)
140+
will lead to excessive wrapping, since all Scala types wrap and delegate to an underlying RxJava implementation.
141+
Note however, that the wrapping happens at query generation time and incurs no overhead when messages are flowing
142+
through the pipeline. Say we have a query `xs.map(f).filter(p).subscribe(o)`. Even though the Scala types are wrappers,
143+
the callback that is registered with xs is something like `x => { val y = f(x); if(p(y)){ o.asJavaObserver.onNext(y) }}`
144+
and hence there is no additional runtime penalty.
Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,25 +15,31 @@
1515
*/
1616
package rx.lang.scala.examples;
1717

18-
import org.junit.Test;
19-
2018
import rx.Observable;
19+
import rx.lang.scala.examples.Movie;
20+
import rx.lang.scala.examples.MovieLib;
2121
import rx.util.functions.Action1;
22-
22+
import static rx.lang.scala.ImplicitFunctionConversions.toScalaObservable;
2323

2424
public class MovieLibUsage {
25-
26-
Action1<Movie> moviePrinter = new Action1<Movie>() {
27-
public void call(Movie m) {
28-
System.out.println("A movie of length " + m.lengthInSeconds() + "s");
29-
}
30-
};
31-
32-
@Test
33-
public void test() {
34-
MovieLib lib = new MovieLib(Observable.from(new Movie(3000), new Movie(1000), new Movie(2000)));
35-
36-
lib.longMovies().subscribe(moviePrinter);
37-
}
25+
26+
public static void main(String[] args) {
27+
28+
Observable<Movie> movies = Observable.from(
29+
new Movie(3000),
30+
new Movie(1000),
31+
new Movie(2000)
32+
);
33+
34+
MovieLib lib = new MovieLib(toScalaObservable(movies));
35+
36+
lib.longMovies().asJavaObservable().subscribe(new Action1<Movie>() {
37+
38+
@Override
39+
public void call(Movie m) {
40+
System.out.println("A movie of length " + m.lengthInSeconds() + "s");
41+
}
42+
});
43+
}
3844

3945
}
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,11 +20,11 @@ import rx.lang.scala.Observable
2020
class Movie(val lengthInSeconds: Int) { }
2121

2222
class MovieLib(val moviesStream: Observable[Movie]) {
23-
23+
2424
val threshold = 1200
25-
25+
2626
def shortMovies: Observable[Movie] = moviesStream.filter(_.lengthInSeconds <= threshold)
27-
27+
2828
def longMovies: Observable[Movie] = moviesStream.filter(_.lengthInSeconds > threshold)
2929

3030
}

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/Olympics.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import scala.concurrent.duration._
2020

2121
object Olympics {
2222
case class Medal(val year: Int, val games: String, val discipline: String, val medal: String, val athlete: String, val country: String)
23-
23+
2424
def mountainBikeMedals: Observable[Medal] = Observable(
2525
Observable(
2626
Medal(1996, "Atlanta 1996", "cross-country men", "Gold", "Bart BRENTJENS", "Netherlands"),
@@ -31,7 +31,7 @@ object Olympics {
3131
Medal(1996, "Atlanta 1996", "cross-country women", "Bronze", "Susan DEMATTEI", "United States of America")
3232
),
3333
fourYearsEmpty,
34-
Observable(
34+
Observable(
3535
Medal(2000, "Sydney 2000", "cross-country women", "Gold", "Paola PEZZO", "Italy"),
3636
Medal(2000, "Sydney 2000", "cross-country women", "Silver", "Barbara BLATTER", "Switzerland"),
3737
Medal(2000, "Sydney 2000", "cross-country women", "Bronze", "Marga FULLANA", "Spain"),
@@ -40,7 +40,7 @@ object Olympics {
4040
Medal(2000, "Sydney 2000", "cross-country men", "Bronze", "Christoph SAUSER", "Switzerland")
4141
),
4242
fourYearsEmpty,
43-
Observable(
43+
Observable(
4444
Medal(2004, "Athens 2004", "cross-country men", "Gold", "Julien ABSALON", "France"),
4545
Medal(2004, "Athens 2004", "cross-country men", "Silver", "Jose Antonio HERMIDA RAMOS", "Spain"),
4646
Medal(2004, "Athens 2004", "cross-country men", "Bronze", "Bart BRENTJENS", "Netherlands"),
@@ -49,7 +49,7 @@ object Olympics {
4949
Medal(2004, "Athens 2004", "cross-country women", "Bronze", "Sabine SPITZ", "Germany")
5050
),
5151
fourYearsEmpty,
52-
Observable(
52+
Observable(
5353
Medal(2008, "Beijing 2008", "cross-country women", "Gold", "Sabine SPITZ", "Germany"),
5454
Medal(2008, "Beijing 2008", "cross-country women", "Silver", "Maja WLOSZCZOWSKA", "Poland"),
5555
Medal(2008, "Beijing 2008", "cross-country women", "Bronze", "Irina KALENTYEVA", "Russian Federation"),
@@ -67,12 +67,12 @@ object Olympics {
6767
Medal(2012, "London 2012", "cross-country women", "Bronze", "Georgia GOULD", "United States of America")
6868
)
6969
).concat
70-
70+
7171
// speed it up :D
7272
val fourYears = 4000.millis
73-
73+
7474
val neverUsedDummyMedal = Medal(3333, "?", "?", "?", "?", "?")
75-
75+
7676
def fourYearsEmpty: Observable[Medal] = {
7777
// TODO this should return an observable which emits nothing during fourYears and then completes
7878
// Because of https://github.com/Netflix/RxJava/issues/388, we get non-terminating tests
@@ -82,5 +82,5 @@ object Olympics {
8282
// But we just return empty, which completes immediately
8383
Observable()
8484
}
85-
85+
8686
}

0 commit comments

Comments
 (0)