Skip to content

Commit

Permalink
support for static project and arc metadata
Browse files Browse the repository at this point in the history
on deploy project and arc data will be uploaded to the metadata bucket, and removed on destroy.

arc state metadata keys have been renamed, but shouldn't harm existing deployments.
  • Loading branch information
cwensel committed Sep 21, 2023
1 parent 8ecf537 commit 0f344d5
Show file tree
Hide file tree
Showing 34 changed files with 1,001 additions and 168 deletions.
13 changes: 13 additions & 0 deletions clusterless-common/src/main/java/clusterless/json/JSONUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
Expand Down Expand Up @@ -213,6 +214,18 @@ public static <T> T readAsObjectSafe(Path path, Class<T> type) {
}
}

public static <T> T readAsObjectSafe(Path path, TypeReference<T> type) {
try {
return OBJECT_READER.forType(type).readValue(path.toFile());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

public static <T> T readAsObject(Path path, TypeReference<T> type) throws IOException {
return OBJECT_READER.forType(type).readValue(path.toFile());
}

public static String writeTypedAsStringSafe(Object object) {
try {
return TYPED_OBJECT_MAPPER.writeValueAsString(object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class ArcNotifyEvent implements NotifyEvent, Struct {
public static final String DETAIL = "Clusterless Arc Notification";
public static final String DATASET_ID = "datasetId";

//todo: this may be redundant as the dataset can be looked up by role in the arc props
//TODO: this may be redundant as the dataset can be looked up by role in the arc props
Dataset dataset;
String lot;
URI manifest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,15 @@ public void isSuccessOrThrowRuntime(Function<Response, String> message) {
isSuccessOrThrow(message, RuntimeException::new);
}

public void isNotSuccessOrThrow(Function<Response, String> message, BiFunction<String, Exception, RuntimeException> exception) {
public void isNotSuccessOrThrow(Function<Response, String> message, BiFunction<String, Throwable, RuntimeException> exception) {
isOrThrow(Predicate.not(Response::isSuccess), message, exception);
}

public void isSuccessOrThrow(Function<Response, String> message, BiFunction<String, Exception, RuntimeException> exception) {
public void isSuccessOrThrow(Function<Response, String> message, BiFunction<String, Throwable, RuntimeException> exception) {
isOrThrow(Response::isSuccess, message, exception);
}

private void isOrThrow(Predicate<Response> predicate, Function<Response, String> message, BiFunction<String, Exception, RuntimeException> exception) {
private void isOrThrow(Predicate<Response> predicate, Function<Response, String> message, BiFunction<String, Throwable, RuntimeException> exception) {
if (predicate.test(this)) {
return;
}
Expand All @@ -165,6 +165,21 @@ private void isOrThrow(Predicate<Response> predicate, Function<Response, String>
throw exception.apply(m, this.exception);
}

public Optional<Throwable> isSuccessOrLog(Function<Response, String> message) {
return isOrLog(Response::isSuccess, message);
}

private Optional<Throwable> isOrLog(Predicate<Response> predicate, Function<Response, String> message) {
if (predicate.test(this)) {
return Optional.empty();
}

String m = message.apply(this);
LOG.error(m, this.errorMessage());

return Optional.of(this.exception);
}

public String errorMessage() {
if (exception != null) {
return exception.getMessage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ public boolean copy(List<Tuple2<URI, URI>> toUris, Consumer<URI> success, BiFunc
URI from = tuple.get_1();
URI to = tuple.get_2();

// todo: apply retry logic here. note response knows about throttling and retryable exceptions
// TODO: apply retry logic here. note response knows about throttling and retryable exceptions
S3.Response response = copy(client, from, to);

if (response.isSuccess()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import clusterless.substrate.aws.props.LambdaJavaRuntimeProps;
import clusterless.substrate.aws.resources.Arcs;
import clusterless.substrate.aws.resources.Events;
import clusterless.substrate.uri.ArcURI;
import clusterless.substrate.uri.ArcStateURI;
import org.jetbrains.annotations.NotNull;
import software.amazon.awscdk.RemovalPolicy;
import software.amazon.awscdk.services.events.targets.SfnStateMachine;
Expand Down Expand Up @@ -57,7 +57,7 @@ public ArcOrchestration(@NotNull ManagedComponentContext context, @NotNull Arc<?
.withSinks(arc.sinks())
.withSources(arc.sources())
.withArcStatePath(
ArcURI.builder()
ArcStateURI.builder()
.withPlacement(context().deployable().placement())
.withProject(context().deployable().project())
.withArcName(arc.name())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ protected void confirmBootstrapForPlacements(List<File> projectFiles, String aws
continue;
}

// todo: add copy/paste bootstrap command here
// TODO: add copy/paste bootstrap command here
String account = placement.account();
String region = placement.region();
String stage = placement.stage();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@

package clusterless.substrate.aws.cdk;

import java.util.Optional;

public enum CDKCommand {
Deploy,
Destroy,
Diff,
Verify,
Synth,
Import;
DEPLOY,
DESTROY,
DIFF,
VERIFY,
SYNTH,
IMPORT;

public String command() {
return name().toLowerCase();
}

public static CDKCommand from(String command) {
return Optional.ofNullable(command).map(String::toUpperCase).map(CDKCommand::valueOf).orElse(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand All @@ -45,6 +46,7 @@ public class CDKProcessExec extends ProcessExec {
private static final Logger LOG = LogManager.getLogger(CDKProcessExec.class);
public static final String CLS_CDK_COMMAND = "CLS_CDK_COMMAND";
public static final String CLS_CDK_OUTPUT_PATH = "CLS_CDK_OUTPUT_PATH";
public static final String CLS_CDK_PROFILE = "CLS_CDK_PROFILE";

@CommandLine.Option(names = "--cdk", description = {"path to the cdk binary", "uses $PATH by default to search for 'cdk'"})
private String cdk = "cdk";
Expand Down Expand Up @@ -174,15 +176,16 @@ public Integer executeCDKApp(@NotNull CommonConfig commonConfig, @NotNull AwsCon
cdkCommands.addAll(
SafeList.of(
cdkCommand.command(),
cdkCommand != CDKCommand.Import ? "--all" : null // deploy all stacks
cdkCommand != CDKCommand.IMPORT ? "--all" : null // deploy all stacks
)
);

cdkCommands.addAll(commandArgs);

Map<String, String> environment = OrderedSafeMaps.of(
CLS_CDK_COMMAND, cdkCommand.command(),
CLS_CDK_OUTPUT_PATH, getOutputPath()
CLS_CDK_OUTPUT_PATH, getOutputPath(),
CLS_CDK_PROFILE, profile()
);

return executeProcess(environment, cdkCommands);
Expand Down Expand Up @@ -231,4 +234,13 @@ protected String getLocalStackPort() {
protected String filesAsArg(List<File> files) {
return files.stream().map(Object::toString).collect(Collectors.joining(","));
}

public static CDKCommand currentCommand() {
return CDKCommand.from(System.getenv().get(CLS_CDK_COMMAND));
}

public static Path cdkLocalOutputPath() {
return Optional.ofNullable(System.getenv().get(CLS_CDK_OUTPUT_PATH)).map(Paths::get).orElse(null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,32 +9,24 @@
package clusterless.substrate.aws.cdk.bootstrap;

import clusterless.command.BootstrapCommandOptions;
import clusterless.json.JSONUtil;
import clusterless.naming.Stage;
import clusterless.substrate.aws.cdk.BaseCDKCommand;
import clusterless.substrate.aws.cdk.CDKCommand;
import clusterless.substrate.aws.cdk.CDKProcessExec;
import clusterless.substrate.aws.managed.StagedApp;
import clusterless.substrate.aws.meta.Metadata;
import clusterless.substrate.aws.resources.Stacks;
import clusterless.substrate.aws.sdk.S3;
import clusterless.util.ExitCodeException;
import clusterless.util.Lists;
import clusterless.util.OrderedSafeMaps;
import clusterless.util.Strings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import picocli.CommandLine;
import software.amazon.awscdk.AppProps;
import software.amazon.awscdk.Environment;
import software.amazon.awscdk.StackProps;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.concurrent.Callable;

Expand Down Expand Up @@ -91,7 +83,7 @@ private Integer exec() {

processExec.setUseTempOutput(true);

CDKCommand cdkCommand = destroyBootstrap ? CDKCommand.Destroy : CDKCommand.Deploy;
CDKCommand cdkCommand = destroyBootstrap ? CDKCommand.DESTROY : CDKCommand.DEPLOY;
List<String> approvals = destroyBootstrap ? getRequireDestroyApproval(commandOptions.approve().orElse(null)) : getRequireDeployApproval(commandOptions.approve().orElse(null));

int exitCode = processExec.executeCDKApp(getCommonConfig(), getProviderConfig(), cdkCommand, approvals, "bootstrap", kernelArgs);
Expand All @@ -111,27 +103,7 @@ private Integer exec() {
return 0;
}

Path bootstrapMetaPath = createBootstrapMetaPath(processExec.getOutputPath());

LOG.info("reading metadata from: {}", bootstrapMetaPath.toAbsolutePath());

if (commandOptions.dryRun()) {
LOG.info("dry run, skipping metadata upload");
return 0;
}

BootstrapMeta bootstrapMeta = JSONUtil.readAsObjectSafe(bootstrapMetaPath, BootstrapMeta.class);

S3 s3 = new S3(processExec.profile(), region);

URI metaURI = S3.createS3URI(bootstrapMeta.exports().get("metadata").name(), "metadata.json");

LOG.info("putting metadata in: {}", metaURI);

s3.put(metaURI, "application/json", bootstrapMeta)
.isSuccessOrThrowRuntime(r -> String.format("unable to upload bootstrap metadata to: %s, %s", metaURI, r.errorMessage()));

return 0;
return Metadata.pushBootstrapMetadata(processExec.profile(), region, processExec.getOutputPath(), commandOptions.dryRun());
}

private Integer synth() {
Expand Down Expand Up @@ -161,30 +133,8 @@ private Integer synth() {

app.synth();

writeBootstrapMeta(deployMeta);
Metadata.writeBootstrapMetaLocal(deployMeta);

return 0;
}

private static void writeBootstrapMeta(BootstrapMeta bootstrapMeta) {
String outputPath = System.getenv().get(CDKProcessExec.CLS_CDK_OUTPUT_PATH);

if (outputPath != null) {
Path bootstrapMetaPath = createBootstrapMetaPath(outputPath);
LOG.info("writing metadata to: {}", bootstrapMetaPath.toAbsolutePath());

try {
Files.createDirectories(bootstrapMetaPath.getParent());
} catch (IOException e) {
throw new UncheckedIOException(e);
}

JSONUtil.writeAsStringSafe(bootstrapMetaPath, bootstrapMeta);
}
}

@NotNull
private static Path createBootstrapMetaPath(String outputPath) {
return Paths.get(outputPath).resolve("bootstrap").resolve("meta.json");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import clusterless.substrate.aws.cdk.BaseCDKCommand;
import clusterless.substrate.aws.cdk.CDKCommand;
import clusterless.substrate.aws.cdk.CDKProcessExec;
import clusterless.substrate.aws.meta.Metadata;
import picocli.CommandLine;

import java.util.concurrent.Callable;
Expand All @@ -30,13 +31,19 @@ public Integer call() throws Exception {

confirmBootstrapForPlacements(commandOptions.projectFiles(), processExec.profile());

return processExec.executeLifecycleProcess(
Integer exitCode = processExec.executeLifecycleProcess(
getCommonConfig(),
getProviderConfig(),
commandOptions,
CDKCommand.Deploy,
CDKCommand.DEPLOY,
getRequireDeployApproval(commandOptions.approve().orElse(null))
);

if (exitCode != 0) {
return exitCode;
}

return Metadata.pushDeployablesMetadata(processExec.getOutputPath(), commandOptions.dryRun());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import clusterless.substrate.aws.cdk.BaseCDKCommand;
import clusterless.substrate.aws.cdk.CDKCommand;
import clusterless.substrate.aws.cdk.CDKProcessExec;
import clusterless.substrate.aws.meta.Metadata;
import picocli.CommandLine;

import java.util.concurrent.Callable;
Expand All @@ -28,12 +29,18 @@ public class Destroy extends BaseCDKCommand implements Callable<Integer> {

@Override
public Integer call() throws Exception {
return processExec.executeLifecycleProcess(
Integer exitCode = processExec.executeLifecycleProcess(
getCommonConfig(),
getProviderConfig(),
commandOptions,
CDKCommand.Destroy,
CDKCommand.DESTROY,
getRequireDestroyApproval(commandOptions.approve().orElse(null))
);

if (exitCode != 0) {
return exitCode;
}

return Metadata.removeDeployablesMetadata(processExec.getOutputPath(), commandOptions.dryRun());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ public class Diff extends BaseCDKCommand implements Callable<Integer> {

@Override
public Integer call() throws Exception {
return processExec.executeLifecycleProcess(getCommonConfig(), getProviderConfig(), commandOptions, CDKCommand.Diff);
return processExec.executeLifecycleProcess(getCommonConfig(), getProviderConfig(), commandOptions, CDKCommand.DIFF);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public Integer call() throws Exception {
getCommonConfig(),
getProviderConfig(),
commandOptions,
CDKCommand.Import,
CDKCommand.IMPORT,
args
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
import clusterless.command.ProjectCommandOptions;
import clusterless.model.deploy.Deployable;
import clusterless.substrate.aws.cdk.BaseCDKCommand;
import clusterless.substrate.aws.cdk.CDKCommand;
import clusterless.substrate.aws.cdk.CDKProcessExec;
import clusterless.substrate.aws.meta.Metadata;
import clusterless.substrate.aws.util.TagsUtil;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -68,6 +71,11 @@ public Integer call() throws IOException {

lifecycle.synthProjectModels(deployables);

CDKCommand cdkCommand = CDKProcessExec.currentCommand();
if (cdkCommand == CDKCommand.DEPLOY || cdkCommand == CDKCommand.DESTROY) {
Metadata.writeProjectMetaLocal(deployables);
}

return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ public Verify() {

@Override
public Integer call() {
return processExec.executeLifecycleProcess(getCommonConfig(), getProviderConfig(), commandOptions, CDKCommand.Synth);
return processExec.executeLifecycleProcess(getCommonConfig(), getProviderConfig(), commandOptions, CDKCommand.SYNTH);
}
}
Loading

0 comments on commit 0f344d5

Please sign in to comment.