Skip to content

Commit

Permalink
Implemented JobOutputStreamForwarder which fixes #6
Browse files Browse the repository at this point in the history
  • Loading branch information
jmaassen committed Jun 8, 2017
1 parent e45f7b9 commit ba6b8cc
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 26 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ hs_err_pid*

__pycache__
*.pyc
/bin/
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ plugins {
id 'com.github.johnrengelman.shadow' version '2.0.0'
// ide
id 'idea'
id 'eclipse'
// test coverage
id 'jacoco'
// quality
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,80 @@
package nl.esciencecenter.xenon.grpc.jobs;

import java.io.IOException;
import java.io.InputStream;

import nl.esciencecenter.xenon.grpc.XenonProto;

import com.google.protobuf.ByteString;

import io.grpc.Status;
import io.grpc.stub.StreamObserver;

class JobOutputStreamsForwarder {
private final XenonProto.JobOutputStreams.Builder builder;

private final XenonProto.JobOutputStreams.Builder builder;
private final StreamObserver<XenonProto.JobOutputStreams> observer;

private static final int BUFFER_SIZE = 1024;

private int streamsDone = 0;

class StreamForwarder extends Thread {

private final byte[] buffer = new byte[BUFFER_SIZE];

private final InputStream in;

private boolean stdout = false;

public StreamForwarder(InputStream in, boolean stdout) {
super("Stream forwarder " + (stdout ? "stdout" : "stderr"));
this.in = in;
this.stdout = stdout;
setDaemon(true);
}

public void run() {
try {
while (true) {
int read = in.read(buffer);

if (read > 0) {
XenonProto.JobOutputStreams response;

if (stdout) {
response = builder.clearStderr().setStdout(ByteString.copyFrom(buffer, 0, read)).build();
} else {
response = builder.clearStdout().setStderr(ByteString.copyFrom(buffer, 0, read)).build();
}

writeOut(response);

} else if (read == -1) {
close();
return;
} }
} catch (IOException e) {
observer.onError(Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asException());
}
}
}

JobOutputStreamsForwarder(StreamObserver<XenonProto.JobOutputStreams> responseObserver, InputStream stderr, InputStream stdout) {
this.observer = responseObserver;
builder = XenonProto.JobOutputStreams.newBuilder();
// TODO setup forwarder to pipe stdout and stderr of streams to responseObserver
}

private void writeStdOut(byte[] value) {
XenonProto.JobOutputStreams response = builder.clearStderr().setStdout(ByteString.copyFrom(value)).build();
observer.onNext(response);
// We should fully read the in and output streams here (non blocking and in parallel) and forward the data to the responseObserver.
new StreamForwarder(stdout, true).start();
new StreamForwarder(stderr, false).start();
}

private void writeStdErr(byte[] value) {
XenonProto.JobOutputStreams response = builder.clearStdout().setStderr(ByteString.copyFrom(value)).build();
observer.onNext(response);
private synchronized void writeOut(XenonProto.JobOutputStreams response) {
observer.onNext(response);
}

void close() {
observer.onCompleted();
public synchronized void close() {
if (++streamsDone == 2) {
observer.onCompleted();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ public void onCompleted() {
if (streams != null) {
try {
streams.getStdin().close();
forwarder.close();
//forwarder.close();
} catch (IOException e) {
LOGGER.warn("Error from server", e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package nl.esciencecenter.xenon.grpc.jobs;

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import nl.esciencecenter.xenon.grpc.XenonJobsGrpc;
import nl.esciencecenter.xenon.grpc.XenonProto;
import nl.esciencecenter.xenon.grpc.XenonProto.JobOutputStreams;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;

import java.io.IOException;

import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.io.IOException;
import com.google.protobuf.ByteString;

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import io.grpc.stub.StreamObserver;
import nl.esciencecenter.xenon.grpc.XenonJobsGrpc;
import nl.esciencecenter.xenon.grpc.XenonProto;
import nl.esciencecenter.xenon.grpc.XenonProto.JobOutputStreams;

public class LocalJobsStreamsTest extends LocalJobsServiceTestBase {
private XenonJobsGrpc.XenonJobsStub aclient;
Expand All @@ -29,7 +33,6 @@ public void setUp() throws IOException {
aclient = XenonJobsGrpc.newStub(channel);
}

@Ignore("Requires implementation")
@Test
public void getStreams_wc() {
// submit job
Expand All @@ -39,10 +42,12 @@ public void getStreams_wc() {
.setWorkingDirectory(myfolder.getRoot().getAbsolutePath())
.setInteractive(true)
.build();

XenonProto.SubmitJobRequest jobRequest = XenonProto.SubmitJobRequest.newBuilder()
.setDescription(description)
.setScheduler(getScheduler())
.build();

XenonProto.Job job = client.submitJob(jobRequest);
// mock receiver
@SuppressWarnings("unchecked")
Expand All @@ -57,14 +62,16 @@ public void getStreams_wc() {
.setJob(job)
.setStdin(stdin)
.build();

requestObserver.onNext(request);
requestObserver.onCompleted();

// receive
ArgumentCaptor<JobOutputStreams> responseCapturer = ArgumentCaptor.forClass(JobOutputStreams.class);
verify(responseObserver, timeout(100)).onNext(responseCapturer.capture());
ByteString expectedStdout = ByteString.copyFromUtf8(" 1 4 16");
ByteString expectedStdout = ByteString.copyFromUtf8(" 0 4 15\n");
JobOutputStreams response = responseCapturer.getValue();

JobOutputStreams expected = JobOutputStreams.newBuilder()
.setStdout(expectedStdout)
.build();
Expand Down

0 comments on commit ba6b8cc

Please sign in to comment.