diff --git a/.gitignore b/.gitignore index 511ef8f..bcfde3c 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,4 @@ hs_err_pid* __pycache__ *.pyc +/bin/ diff --git a/build.gradle b/build.gradle index 7d742ef..fa3ae6a 100644 --- a/build.gradle +++ b/build.gradle @@ -6,6 +6,7 @@ plugins { id 'com.github.johnrengelman.shadow' version '2.0.0' // ide id 'idea' + id 'eclipse' // test coverage id 'jacoco' // quality diff --git a/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobOutputStreamsForwarder.java b/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobOutputStreamsForwarder.java index e98a9e7..f1a4be6 100644 --- a/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobOutputStreamsForwarder.java +++ b/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobOutputStreamsForwarder.java @@ -1,33 +1,80 @@ package nl.esciencecenter.xenon.grpc.jobs; +import java.io.IOException; import java.io.InputStream; import nl.esciencecenter.xenon.grpc.XenonProto; import com.google.protobuf.ByteString; + +import io.grpc.Status; import io.grpc.stub.StreamObserver; class JobOutputStreamsForwarder { - private final XenonProto.JobOutputStreams.Builder builder; + + private final XenonProto.JobOutputStreams.Builder builder; private final StreamObserver observer; + private static final int BUFFER_SIZE = 1024; + + private int streamsDone = 0; + + class StreamForwarder extends Thread { + + private final byte[] buffer = new byte[BUFFER_SIZE]; + + private final InputStream in; + + private boolean stdout = false; + + public StreamForwarder(InputStream in, boolean stdout) { + super("Stream forwarder " + (stdout ? "stdout" : "stderr")); + this.in = in; + this.stdout = stdout; + setDaemon(true); + } + + public void run() { + try { + while (true) { + int read = in.read(buffer); + + if (read > 0) { + XenonProto.JobOutputStreams response; + + if (stdout) { + response = builder.clearStderr().setStdout(ByteString.copyFrom(buffer, 0, read)).build(); + } else { + response = builder.clearStdout().setStderr(ByteString.copyFrom(buffer, 0, read)).build(); + } + + writeOut(response); + + } else if (read == -1) { + close(); + return; + } } + } catch (IOException e) { + observer.onError(Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asException()); + } + } + } + JobOutputStreamsForwarder(StreamObserver responseObserver, InputStream stderr, InputStream stdout) { this.observer = responseObserver; builder = XenonProto.JobOutputStreams.newBuilder(); - // TODO setup forwarder to pipe stdout and stderr of streams to responseObserver - } - - private void writeStdOut(byte[] value) { - XenonProto.JobOutputStreams response = builder.clearStderr().setStdout(ByteString.copyFrom(value)).build(); - observer.onNext(response); + // We should fully read the in and output streams here (non blocking and in parallel) and forward the data to the responseObserver. + new StreamForwarder(stdout, true).start(); + new StreamForwarder(stderr, false).start(); } - private void writeStdErr(byte[] value) { - XenonProto.JobOutputStreams response = builder.clearStdout().setStderr(ByteString.copyFrom(value)).build(); - observer.onNext(response); + private synchronized void writeOut(XenonProto.JobOutputStreams response) { + observer.onNext(response); } - void close() { - observer.onCompleted(); + public synchronized void close() { + if (++streamsDone == 2) { + observer.onCompleted(); + } } } diff --git a/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobsService.java b/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobsService.java index e161aca..fde3ef1 100644 --- a/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobsService.java +++ b/src/main/java/nl/esciencecenter/xenon/grpc/jobs/JobsService.java @@ -494,7 +494,7 @@ public void onCompleted() { if (streams != null) { try { streams.getStdin().close(); - forwarder.close(); + //forwarder.close(); } catch (IOException e) { LOGGER.warn("Error from server", e); } diff --git a/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsStreamsTest.java b/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsStreamsTest.java index 86912c9..3491077 100644 --- a/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsStreamsTest.java +++ b/src/test/java/nl/esciencecenter/xenon/grpc/jobs/LocalJobsStreamsTest.java @@ -1,20 +1,24 @@ package nl.esciencecenter.xenon.grpc.jobs; -import com.google.protobuf.ByteString; -import io.grpc.stub.StreamObserver; -import nl.esciencecenter.xenon.grpc.XenonJobsGrpc; -import nl.esciencecenter.xenon.grpc.XenonProto; -import nl.esciencecenter.xenon.grpc.XenonProto.JobOutputStreams; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +import java.io.IOException; + import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.mockito.ArgumentCaptor; -import java.io.IOException; +import com.google.protobuf.ByteString; -import static org.junit.Assert.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import io.grpc.stub.StreamObserver; +import nl.esciencecenter.xenon.grpc.XenonJobsGrpc; +import nl.esciencecenter.xenon.grpc.XenonProto; +import nl.esciencecenter.xenon.grpc.XenonProto.JobOutputStreams; public class LocalJobsStreamsTest extends LocalJobsServiceTestBase { private XenonJobsGrpc.XenonJobsStub aclient; @@ -29,7 +33,6 @@ public void setUp() throws IOException { aclient = XenonJobsGrpc.newStub(channel); } - @Ignore("Requires implementation") @Test public void getStreams_wc() { // submit job @@ -39,10 +42,12 @@ public void getStreams_wc() { .setWorkingDirectory(myfolder.getRoot().getAbsolutePath()) .setInteractive(true) .build(); + XenonProto.SubmitJobRequest jobRequest = XenonProto.SubmitJobRequest.newBuilder() .setDescription(description) .setScheduler(getScheduler()) .build(); + XenonProto.Job job = client.submitJob(jobRequest); // mock receiver @SuppressWarnings("unchecked") @@ -57,14 +62,16 @@ public void getStreams_wc() { .setJob(job) .setStdin(stdin) .build(); + requestObserver.onNext(request); requestObserver.onCompleted(); - + // receive ArgumentCaptor responseCapturer = ArgumentCaptor.forClass(JobOutputStreams.class); verify(responseObserver, timeout(100)).onNext(responseCapturer.capture()); - ByteString expectedStdout = ByteString.copyFromUtf8(" 1 4 16"); + ByteString expectedStdout = ByteString.copyFromUtf8(" 0 4 15\n"); JobOutputStreams response = responseCapturer.getValue(); + JobOutputStreams expected = JobOutputStreams.newBuilder() .setStdout(expectedStdout) .build();