Skip to content

Commit

Permalink
New Greengrass IPC client added (#121)
Browse files Browse the repository at this point in the history
update CRT to 0.9.2 (#17)
Adding Greengrass IPC client

Co-authored-by: Justin Boswell <[email protected]>
Co-authored-by: Michael Graeb <[email protected]>
  • Loading branch information
3 people authored Dec 15, 2020
1 parent 4be6b15 commit 2080cba
Show file tree
Hide file tree
Showing 157 changed files with 11,505 additions and 7 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ mvn clean install
``` sh
# NOTE: use the latest version of the CRT here

git clone --branch v0.4.20 https://github.com/awslabs/aws-crt-java.git
git clone --branch v0.9.2 https://github.com/awslabs/aws-crt-java.git

git clone https://github.com/awslabs/aws-iot-device-sdk-java-v2.git
cd aws-crt-java
Expand All @@ -66,7 +66,7 @@ Supports API 26 or newer.
NOTE: The shadow sample does not currently complete on android due to its dependence on stdin keyboard input.

``` sh
git clone --recursive --branch v0.6.2 https://github.com/awslabs/aws-crt-java.git
git clone --recursive --branch v0.9.2 https://github.com/awslabs/aws-crt-java.git
git clone https://github.com/awslabs/aws-iot-device-sdk-java-v2.git
cd aws-crt-java/android
./gradlew connectedCheck # optional, will run the unit tests on any connected devices/emulators
Expand All @@ -87,7 +87,7 @@ repositories {
}
dependencies {
implementation 'software.amazon.awssdk.crt:android:0.6.2'
implementation 'software.amazon.awssdk.crt:android:0.9.2'
}
```

Expand Down
2 changes: 1 addition & 1 deletion android/app/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ repositories {
dependencies {
implementation fileTree(dir: 'libs', include: ['*.jar'])
implementation project(":iotdevicesdk")
implementation 'software.amazon.awssdk.crt:android:0.8.4'
implementation 'software.amazon.awssdk.crt:android:0.9.2'
implementation "org.jetbrains.kotlin:kotlin-stdlib-jdk7:$kotlin_version"
implementation 'androidx.appcompat:appcompat:1.1.0'
implementation 'androidx.core:core:1.2.0'
Expand Down
2 changes: 1 addition & 1 deletion android/iotdevicesdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ repositories {

dependencies {
implementation fileTree(dir: 'libs', include: ['*.jar'])
implementation 'software.amazon.awssdk.crt:android:0.8.4'
implementation 'software.amazon.awssdk.crt:android:0.9.2'
implementation 'com.google.code.gson:gson:2.8.5'
implementation 'androidx.appcompat:appcompat:1.1.0'
testImplementation 'junit:junit:4.12'
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package software.amazon.awssdk.eventstreamrpc;

import software.amazon.awssdk.crt.io.*;
import software.amazon.awssdk.eventstreamrpc.MessageAmendInfo;

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

/**
* The closeable elements inside the EventStreamRPCConnectionConfig are not cleaned up when
* this config object is done. It is still up to the caller of the constructor to clean up
* resources that are associated in the config.
*
* The connect message transformer is used to supply additional connect message headers
* and supply the payload of the connect message. This is to be used to supply authentication
* information on the connect
*/
public class EventStreamRPCConnectionConfig {
private final ClientBootstrap clientBootstrap;
private final EventLoopGroup eventLoopGroup;
private final SocketOptions socketOptions;
private final ClientTlsContext tlsContext;
private final String host;
private final int port;

/**
* MessageAmendInfo here is used to add supplied headers to the Connect message, and
* set the payload of that message as well.
*/
private final Supplier<CompletableFuture<MessageAmendInfo>> connectMessageAmender;

public EventStreamRPCConnectionConfig(ClientBootstrap clientBootstrap, EventLoopGroup eventLoopGroup,
SocketOptions socketOptions, ClientTlsContext tlsContext,
String host, int port, Supplier<CompletableFuture<MessageAmendInfo>> connectMessageAmender) {
this.clientBootstrap = clientBootstrap;
this.eventLoopGroup = eventLoopGroup;
this.socketOptions = socketOptions;
this.tlsContext = tlsContext;
this.host = host;
this.port = port;
this.connectMessageAmender = connectMessageAmender;

//perform cast to throw exception here if port value is out of short value range
final short shortPort = (short)port;

//bit of C++ RAII here, validate what we can
if (clientBootstrap == null || eventLoopGroup == null || socketOptions == null ||
host == null || host.isEmpty() || port < 0) {
throw new IllegalArgumentException("EventStreamRPCConnectionConfig values are invalid!");
}
}

public ClientBootstrap getClientBootstrap() {
return clientBootstrap;
}

public EventLoopGroup getEventLoopGroup() {
return eventLoopGroup;
}

public SocketOptions getSocketOptions() {
return socketOptions;
}

public ClientTlsContext getTlsContext() {
return tlsContext;
}

public String getHost() {
return host;
}

public int getPort() {
return port;
}

public Supplier<CompletableFuture<MessageAmendInfo>> getConnectMessageAmender() {
return connectMessageAmender;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package software.amazon.awssdk.eventstreamrpc;

import com.google.gson.Gson;
import software.amazon.awssdk.crt.eventstream.Header;

import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

public class GreengrassConnectMessageSupplier {

public static Supplier<CompletableFuture<MessageAmendInfo>> connectMessageSupplier(String authToken) {
return () -> {
final List<Header> headers = new LinkedList<>();
GreengrassEventStreamConnectMessage connectMessage = new GreengrassEventStreamConnectMessage();
connectMessage.setAuthToken(authToken);
String payload = new Gson().toJson(connectMessage);
return CompletableFuture.completedFuture(new MessageAmendInfo(headers, payload.getBytes(StandardCharsets.UTF_8)));
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package software.amazon.awssdk.eventstreamrpc;

public class GreengrassEventStreamConnectMessage {

private String authToken;

public void setAuthToken(String authToken) {
this.authToken = authToken;
}

public String getAuthToken() {
return this.authToken;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package software.amazon.awssdk.eventstreamrpc;

import software.amazon.awssdk.crt.eventstream.ClientConnectionContinuation;
import software.amazon.awssdk.crt.eventstream.Header;
import software.amazon.awssdk.crt.eventstream.MessageFlags;
import software.amazon.awssdk.crt.eventstream.MessageType;
import software.amazon.awssdk.eventstreamrpc.model.EventStreamJsonMessage;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Logger;

/**
* Underlying type for operation response handling. Enables publishing on stream operations from
* client, closing of any open stream, and retrieval of response. Specific generated operation response
* handlers are usually simple wrappers with the generic types specified
*
* @param <ResponseType>
* @param <StreamRequestType>
*/
public class OperationResponse<ResponseType extends EventStreamJsonMessage,
StreamRequestType extends EventStreamJsonMessage>
implements StreamResponse<ResponseType, StreamRequestType>, AutoCloseable {
private static final Logger LOGGER = Logger.getLogger(OperationResponse.class.getName());
private final OperationModelContext operationModelContext;
private final ClientConnectionContinuation continuation;
private final CompletableFuture<ResponseType> responseFuture;
private final CompletableFuture<Void> requestFlushFuture;
private final AtomicBoolean isClosed;

public OperationResponse(OperationModelContext<ResponseType, ?, StreamRequestType, ?> operationModelContext,
ClientConnectionContinuation continuation,
CompletableFuture<ResponseType> responseFuture,
CompletableFuture<Void> requestFlushFuture) {
this.operationModelContext = operationModelContext;
this.continuation = continuation;
this.responseFuture = responseFuture;
this.requestFlushFuture = requestFlushFuture;
this.isClosed = new AtomicBoolean(continuation != null && !continuation.isNull());
}

final public CompletableFuture<Void> getRequestFlushFuture() {
return requestFlushFuture;
}

/**
* Get the response completable future to wait on the initial response
* if there is one.
*
* May throw exception if requestFlushFuture throws an exception and will
* block if requestFlush has not completed.
*
* @return
*/
public CompletableFuture<ResponseType> getResponse() {
//semantics here are: if the request was never successfully sent
//then the request flush future holds the exception thrown so that
//must be made visible of the caller waits for the response directly.
//It is impossible to have a successful response future completed
//with a request flush never having completed or having thrown an
//exception.
return requestFlushFuture.thenCompose((v) -> responseFuture);
}

/**
* Publish stream events on an open operation's event stream.
* @param streamEvent event to publish
*/
@Override
public CompletableFuture<Void> sendStreamEvent(final StreamRequestType streamEvent) {
try {
final List<Header> headers = new LinkedList<>();
headers.add(Header.createHeader(EventStreamRPCServiceModel.SERVICE_MODEL_TYPE_HEADER,
(String) operationModelContext.getStreamingRequestApplicationModelType().get()));
headers.add(Header.createHeader(EventStreamRPCServiceModel.CONTENT_TYPE_HEADER,
EventStreamRPCServiceModel.CONTENT_TYPE_APPLICATION_JSON));
final byte[] payload = operationModelContext.getServiceModel()
.toJson(streamEvent);
return continuation.sendMessage(headers, payload,
MessageType.ApplicationMessage, 0)
.whenComplete((res, ex) -> {
if (ex != null) {
LOGGER.warning(String.format("%s caught %s while sending message the event stream: %s",
operationModelContext.getOperationName(), ex.getClass().getName(),
ex.getMessage()));
closeStream();
}
});
} catch (Exception e) {
final CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
}

/**
* Initiate a close on the event stream from the client side.
*
* @return
*/
@Override
public CompletableFuture<Void> closeStream() {
if (continuation != null && !continuation.isNull()) {
return continuation.sendMessage(null, null,
MessageType.ApplicationMessage, MessageFlags.TerminateStream.getByteValue())
.whenComplete((res, ex) -> {
LOGGER.info(operationModelContext.getOperationName() + " operation stream closed");
continuation.close();
if (ex != null) {
LOGGER.warning(String.format("%s threw %s while closing the event stream: %s",
operationModelContext.getOperationName(), ex.getClass().getName(),
ex.getMessage()));
}
});
}
return CompletableFuture.completedFuture(null);
}

/**
* Checks if the stream is closed
* @return
*/
public boolean isClosed() {
return isClosed.get();
}

@Override
public void close() throws Exception {
if (isClosed.compareAndSet(false, true)) {
closeStream();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package software.amazon.awssdk.eventstreamrpc;

import software.amazon.awssdk.eventstreamrpc.StreamEventPublisher;
import software.amazon.awssdk.eventstreamrpc.model.EventStreamJsonMessage;

import java.util.concurrent.CompletableFuture;

public interface StreamResponse<ResponseType extends EventStreamJsonMessage, StreamRequestType extends EventStreamJsonMessage>
extends StreamEventPublisher<StreamRequestType> {
/**
* Completable future indicating flush of the request that initiated the stream operation
*
* @return
*/
CompletableFuture<Void> getRequestFlushFuture();

/**
* Completable future for retrieving the initial-response of the stream operation
*
* @return
*/
CompletableFuture<ResponseType> getResponse();

/**
* Tests if the stream is closed
* @return
*/
boolean isClosed();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package software.amazon.awssdk.eventstreamrpc;

/**
* Operation response handler is needed to invoke an operation that has a streaming
* response element to it.
*
* @param <StreamEventType>
*/
public interface StreamResponseHandler<StreamEventType> {

/**
*
* @param streamEvent
*/
void onStreamEvent(final StreamEventType streamEvent);

/**
* Called when there's an error in the stream. Return value of this function
* suggests whether or not the client handling will keep the stream open
* or close it.
*
* There are conditions when onStreamError() may be triggered but the client handling will
* close the connection anyways.
*
* @param error
* @return true if the stream should be closed on this error, false if stream should remain open
*/
boolean onStreamError(final Throwable error);

/**
* Called when stream is closed
*/
void onStreamClosed();
}
Loading

0 comments on commit 2080cba

Please sign in to comment.