Skip to content

Commit 4cc3739

Browse files
Merge pull request #378 from benjchristensen/tests
UnitTests while working on EventStream use cases
2 parents df358b1 + be80eba commit 4cc3739

File tree

4 files changed

+241
-0
lines changed

4 files changed

+241
-0
lines changed
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package rx;
2+
3+
import java.util.Collections;
4+
import java.util.LinkedHashMap;
5+
import java.util.Map;
6+
7+
import rx.Observable.OnSubscribeFunc;
8+
import rx.concurrency.Schedulers;
9+
import rx.subscriptions.BooleanSubscription;
10+
import rx.util.functions.Action0;
11+
12+
/**
13+
* Utility for retrieving a mock eventstream for testing.
14+
*/
15+
public class EventStream {
16+
17+
public static Observable<Event> getEventStream(final String type, final int numInstances) {
18+
return Observable.create(new OnSubscribeFunc<Event>() {
19+
20+
@Override
21+
public Subscription onSubscribe(final Observer<? super Event> observer) {
22+
final BooleanSubscription s = new BooleanSubscription();
23+
// run on a background thread inside the OnSubscribeFunc so unsubscribe works
24+
Schedulers.newThread().schedule(new Action0() {
25+
26+
@Override
27+
public void call() {
28+
while (!(s.isUnsubscribed() || Thread.currentThread().isInterrupted())) {
29+
observer.onNext(randomEvent(type, numInstances));
30+
try {
31+
// slow it down somewhat
32+
Thread.sleep(50);
33+
} catch (InterruptedException e) {
34+
observer.onError(e);
35+
}
36+
}
37+
observer.onCompleted();
38+
}
39+
40+
});
41+
42+
return s;
43+
}
44+
});
45+
}
46+
47+
public static Event randomEvent(String type, int numInstances) {
48+
Map<String, Object> values = new LinkedHashMap<String, Object>();
49+
values.put("count200", randomIntFrom0to(4000));
50+
values.put("count4xx", randomIntFrom0to(300));
51+
values.put("count5xx", randomIntFrom0to(500));
52+
return new Event(type, "instance_" + randomIntFrom0to(numInstances), values);
53+
}
54+
55+
private static int randomIntFrom0to(int max) {
56+
// XORShift instead of Math.random http://javamex.com/tutorials/random_numbers/xorshift.shtml
57+
long x = System.nanoTime();
58+
x ^= (x << 21);
59+
x ^= (x >>> 35);
60+
x ^= (x << 4);
61+
return Math.abs((int) x % max);
62+
}
63+
64+
public static class Event {
65+
public final String type;
66+
public final String instanceId;
67+
public final Map<String, Object> values;
68+
69+
/**
70+
* @param type
71+
* @param instanceId
72+
* @param values
73+
* This does NOT deep-copy, so do not mutate this Map after passing it in.
74+
*/
75+
public Event(String type, String instanceId, Map<String, Object> values) {
76+
this.type = type;
77+
this.instanceId = instanceId;
78+
this.values = Collections.unmodifiableMap(values);
79+
}
80+
}
81+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package rx;
2+
3+
import org.junit.Test;
4+
5+
import rx.EventStream.Event;
6+
import rx.observables.GroupedObservable;
7+
import rx.util.functions.Action1;
8+
import rx.util.functions.Func1;
9+
10+
public class GroupByTests {
11+
12+
@Test
13+
public void testTakeUnsubscribesOnGroupBy() {
14+
Observable.merge(
15+
EventStream.getEventStream("HTTP-ClusterA", 50),
16+
EventStream.getEventStream("HTTP-ClusterB", 20))
17+
// group by type (2 clusters)
18+
.groupBy(new Func1<Event, String>() {
19+
20+
@Override
21+
public String call(Event event) {
22+
return event.type;
23+
}
24+
25+
}).take(1)
26+
.toBlockingObservable().forEach(new Action1<GroupedObservable<String, Event>>() {
27+
28+
@Override
29+
public void call(GroupedObservable<String, Event> g) {
30+
System.out.println(g);
31+
}
32+
33+
});
34+
35+
System.out.println("**** finished");
36+
}
37+
38+
@Test
39+
public void testTakeUnsubscribesOnFlatMapOfGroupBy() {
40+
Observable.merge(
41+
EventStream.getEventStream("HTTP-ClusterA", 50),
42+
EventStream.getEventStream("HTTP-ClusterB", 20))
43+
// group by type (2 clusters)
44+
.groupBy(new Func1<Event, String>() {
45+
46+
@Override
47+
public String call(Event event) {
48+
return event.type;
49+
}
50+
51+
})
52+
.flatMap(new Func1<GroupedObservable<String, Event>, Observable<String>>() {
53+
54+
@Override
55+
public Observable<String> call(GroupedObservable<String, Event> g) {
56+
return g.map(new Func1<Event, String>() {
57+
58+
@Override
59+
public String call(Event event) {
60+
return event.instanceId + " - " + event.values.get("count200");
61+
}
62+
});
63+
}
64+
65+
})
66+
.take(20)
67+
.toBlockingObservable().forEach(new Action1<String>() {
68+
69+
@Override
70+
public void call(String v) {
71+
System.out.println(v);
72+
}
73+
74+
});
75+
76+
System.out.println("**** finished");
77+
}
78+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package rx;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
6+
import org.junit.Test;
7+
8+
import rx.EventStream.Event;
9+
import rx.util.functions.Action1;
10+
import rx.util.functions.Func2;
11+
12+
public class ScanTests {
13+
14+
@Test
15+
public void testUnsubscribeScan() {
16+
17+
EventStream.getEventStream("HTTP-ClusterB", 20)
18+
.scan(new HashMap<String, String>(), new Func2<Map<String, String>, Event, Map<String, String>>() {
19+
20+
@Override
21+
public Map<String, String> call(Map<String, String> accum, Event perInstanceEvent) {
22+
accum.put("instance", perInstanceEvent.instanceId);
23+
return accum;
24+
}
25+
26+
})
27+
.take(10)
28+
.toBlockingObservable().forEach(new Action1<Map<String, String>>() {
29+
30+
@Override
31+
public void call(Map<String, String> v) {
32+
System.out.println(v);
33+
}
34+
35+
});
36+
}
37+
}

rxjava-core/src/test/java/rx/ZipTests.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package rx;
22

3+
import java.util.HashMap;
4+
import java.util.Map;
5+
36
import org.junit.Test;
47

58
import rx.CovarianceTest.CoolRating;
@@ -9,11 +12,53 @@
912
import rx.CovarianceTest.Movie;
1013
import rx.CovarianceTest.Rating;
1114
import rx.CovarianceTest.Result;
15+
import rx.EventStream.Event;
16+
import rx.observables.GroupedObservable;
1217
import rx.util.functions.Action1;
18+
import rx.util.functions.Func1;
1319
import rx.util.functions.Func2;
1420

1521
public class ZipTests {
1622

23+
@Test
24+
public void testZipObservableOfObservables() {
25+
EventStream.getEventStream("HTTP-ClusterB", 20)
26+
.groupBy(new Func1<Event, String>() {
27+
28+
@Override
29+
public String call(Event e) {
30+
return e.instanceId;
31+
}
32+
33+
// now we have streams of cluster+instanceId
34+
}).flatMap(new Func1<GroupedObservable<String, Event>, Observable<Map<String, String>>>() {
35+
36+
@Override
37+
public Observable<Map<String, String>> call(final GroupedObservable<String, Event> ge) {
38+
return ge.scan(new HashMap<String, String>(), new Func2<Map<String, String>, Event, Map<String, String>>() {
39+
40+
@Override
41+
public Map<String, String> call(Map<String, String> accum, Event perInstanceEvent) {
42+
accum.put("instance", ge.getKey());
43+
return accum;
44+
}
45+
46+
});
47+
}
48+
})
49+
.take(10)
50+
.toBlockingObservable().forEach(new Action1<Map<String, String>>() {
51+
52+
@Override
53+
public void call(Map<String, String> v) {
54+
System.out.println(v);
55+
}
56+
57+
});
58+
59+
System.out.println("**** finished");
60+
}
61+
1762
/**
1863
* This won't compile if super/extends isn't done correctly on generics
1964
*/

0 commit comments

Comments
 (0)