Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compile warnings in mantis server worker #423

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
Fixed warnings in src/test
SalmaAfifi committed Apr 18, 2023
commit e2dcdc67078d03a38902d67b19b5d90f6da45217
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ public class DataDroppedPayloadSetterTest {
private static final Logger logger = LoggerFactory.getLogger(DataDroppedPayloadSetterTest.class);

@Test
public void testAggregateDropOperatorMetrics() throws Exception {
public void testAggregateDropOperatorMetrics() {
SpectatorRegistryFactory.setRegistry(new DefaultRegistry());
Heartbeat heartbeat = new Heartbeat("job-1", 1, 1, 1);
DataDroppedPayloadSetter payloadSetter = new DataDroppedPayloadSetter(heartbeat);
Original file line number Diff line number Diff line change
@@ -16,17 +16,18 @@

package io.mantisrx.server.worker;

import static org.junit.Assert.assertEquals;

import io.mantisrx.server.core.Status;
import io.mantisrx.server.core.StatusPayloads;
import java.util.List;
import junit.framework.Assert;
import org.junit.Test;


public class HeartbeatTest {

@Test
public void testSingleUsePayloads() throws Exception {
public void testSingleUsePayloads() {
Heartbeat heartbeat = new Heartbeat("Jobcluster-123", 1, 0, 0);
heartbeat.setPayload("" + StatusPayloads.Type.SubscriptionState, "true");
int val1 = 10;
@@ -35,14 +36,14 @@ public void testSingleUsePayloads() throws Exception {
heartbeat.addSingleUsePayload("" + StatusPayloads.Type.IncomingDataDrop, "" + val2);
final Status currentHeartbeatStatus = heartbeat.getCurrentHeartbeatStatus();
List<Status.Payload> payloads = currentHeartbeatStatus.getPayloads();
Assert.assertEquals(2, payloads.size());
assertEquals(2, payloads.size());
int value = 0;
for (Status.Payload p : payloads) {
if (StatusPayloads.Type.valueOf(p.getType()) == StatusPayloads.Type.IncomingDataDrop)
value = Integer.parseInt(p.getData());
}
Assert.assertEquals(val2, value);
assertEquals(val2, value);
payloads = heartbeat.getCurrentHeartbeatStatus().getPayloads();
Assert.assertEquals(1, payloads.size());
assertEquals(1, payloads.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets log the payloads before asserting the size, so when the test fails, it's clearer why.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. Fixed in 07ad134

}
}
Original file line number Diff line number Diff line change
@@ -16,7 +16,8 @@

package io.mantisrx.server.worker;

import static junit.framework.Assert.assertEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import io.mantisrx.runtime.MantisJobDurationType;
@@ -61,7 +62,7 @@ public void convertJobSchedulingInfoToWorkerMapTest() {

List<WorkerInfo> workersForStage1 = workerMap.getWorkersForStage(1);

assertTrue(workersForStage1 != null);
assertNotNull(workersForStage1);
assertEquals(2, workersForStage1.size());

for (int i = 0; i < workersForStage1.size(); i++) {
@@ -75,7 +76,7 @@ public void convertJobSchedulingInfoToWorkerMapTest() {

List<WorkerInfo> workersForStage2 = workerMap.getWorkersForStage(2);

assertTrue(workersForStage2 != null);
assertNotNull(workersForStage2);
assertEquals(4, workersForStage2.size());

for (int i = 0; i < workersForStage2.size(); i++) {
@@ -156,16 +157,11 @@ WorkerAssignments createWorkerAssignments(int stageNo, int noWorkers) {
@Test
public void deferTest() throws InterruptedException {

Subscription subscribe1 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> {
System.out.println("In 1 -> " + t);
});
Subscription subscribe1 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> System.out.println("In 1 -> " + t));

Thread.sleep(5000);

Subscription subscribe2 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> {
System.out.println("In 2 -> " + t);
});

Subscription subscribe2 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> System.out.println("In 2 -> " + t));

Thread.sleep(5000);
subscribe1.unsubscribe();
@@ -174,69 +170,49 @@ public void deferTest() throws InterruptedException {
subscribe2.unsubscribe();

Thread.sleep(5000);
Subscription subscribe3 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> {
System.out.println("In 3 -> " + t);
});
Subscription subscribe3 = getObs4().subscribeOn(Schedulers.io()).subscribe((t) -> System.out.println("In 3 -> " + t));
Thread.sleep(5000);
subscribe3.unsubscribe();
Thread.sleep(10000);
}

Observable<Long> getObs() {
Observable<Long> oLong = Observable.defer(() -> {
return Observable.interval(1, TimeUnit.SECONDS).doOnNext((e) -> {
System.out.println("Minted " + e);
}).share();
}).doOnSubscribe(() -> {
System.out.println("Subscribed111" + System.currentTimeMillis());
}).doOnUnsubscribe(() -> {
System.out.println("UnSubscribed111" + System.currentTimeMillis());
});
Observable<Long> oLong =
Observable.defer(() -> Observable.interval(1, TimeUnit.SECONDS)
.doOnNext((e) -> System.out.println("Minted " + e)).share())
.doOnSubscribe(() -> System.out.println("Subscribed111" + System.currentTimeMillis()))
.doOnUnsubscribe(() -> System.out.println("UnSubscribed111" + System.currentTimeMillis()));
return oLong;
}

Observable<Long> getObs2() {

return Observable.interval(1, TimeUnit.SECONDS)
.doOnNext((e) -> {
System.out.println("Minted " + e);
})
.doOnNext((e) -> System.out.println("Minted " + e))
.share()
.doOnSubscribe(() -> {
System.out.println("Subscribed111" + System.currentTimeMillis());
}).doOnUnsubscribe(() -> {
System.out.println("UnSubscribed111" + System.currentTimeMillis());
})

;
.doOnSubscribe(() -> System.out.println("Subscribed111" + System.currentTimeMillis()))
.doOnUnsubscribe(() -> System.out.println("UnSubscribed111" + System.currentTimeMillis()));

}

Observable<Long> getObs3() {

return Observable.range(1, 100).doOnNext((e) -> {
System.out.println("Minted " + e);
}).map((i) -> {
return new Long(i);
}).share()
.doOnSubscribe(() -> {
System.out.println("Subscribed111" + System.currentTimeMillis());
}).doOnUnsubscribe(() -> {
System.out.println("UnSubscribed111" + System.currentTimeMillis());
});
return Observable.range(1, 100)
.doOnNext((e) -> System.out.println("Minted " + e))
.map(Long::new)
.share()
.doOnSubscribe(() -> System.out.println("Subscribed111" + System.currentTimeMillis()))
.doOnUnsubscribe(() -> System.out.println("UnSubscribed111" + System.currentTimeMillis()));

}

Observable<Long> getObs4() {
BehaviorSubject<Long> o = BehaviorSubject.create();
Observable.interval(1, TimeUnit.SECONDS).doOnNext((e) -> {
System.out.println("Minted " + e);
}).doOnSubscribe(() -> {
System.out.println("Subscribed111" + System.currentTimeMillis());
}).doOnUnsubscribe(() -> {
System.out.println("UnSubscribed111" + System.currentTimeMillis());
})
.subscribe(o);
Observable.interval(1, TimeUnit.SECONDS)
.doOnNext((e) -> System.out.println("Minted " + e))
.doOnSubscribe(() -> System.out.println("Subscribed111" + System.currentTimeMillis()))
.doOnUnsubscribe(() -> System.out.println("UnSubscribed111" + System.currentTimeMillis()))
.subscribe(o);

return o;