diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5a7dc41 --- /dev/null +++ b/.gitignore @@ -0,0 +1,5 @@ +/.idea +/target +/*.iml +.*~ +*~ diff --git a/README.md b/README.md index a4fb283..d5ba5b1 100644 --- a/README.md +++ b/README.md @@ -1,113 +1,169 @@ # DynamoDB Import Export Tool -The DynamoDB Import Export Tool is designed to perform parallel scans on the source table, store scan results in a queue, then consume the queue by writing the items asynchronously to a destination table. +The DynamoDB Import Export Tool is designed to perform parallel scans on the source table, +store scan results in a queue, then consume the queue by writing the items asynchronously to a destination table. ## Requirements ## * Maven * JRE 1.7+ -* Pre-existing source and destination DynamoDB tables +* Pre-existing source DynamoDB tables. The destination table is optional in the CLI; you can choose to create the +destination table if it does not exist. ## Running as an executable - -1. Build the library: - -``` - mvn install +1. Build the library with `mvn install`. This produces the target jar in the target/ directory. +The CLI's usage follows with required parameters marked by asterisks. + +```bash + --consistentScan + Use this flag to use strongly consistent scan. If the flag is not used + it will default to eventually consistent scan + Default: false + --copyStreamSpecificationWhenCreating + Use the source table stream specification for the destination table + during its creation. + Default: false + --createAllGsi + Create all GSI in destination table + Default: false + --createAllLsi + Create all LSI in destination table + Default: false + --createDestination + Create destination table if it does not exist + Default: false + --destinationEndpoint + Endpoint of the destination table + * --destinationSigningRegion + Signing region for the destination endpoint + * --destinationTable + Name of the destination table + --help + Display usage information + --includeGsi + Include the following GSI in the destination table + --includeLsi + Include the following LSI in the destination table + --maxWriteThreads + Number of max threads to write to destination table + Default: 1024 + * --readThroughputRatio + Percentage of total read throughput to scan the source table + Default: 0.0 + --section + Section number to scan when running multiple programs concurrently [0, + 1... totalSections-1] + Default: 0 + --sourceEndpoint + Endpoint of the source table + * --sourceSigningRegion + Signing region for the source endpoint + * --sourceTable + Name of the source table + --totalSections + Total number of sections to divide the scan into + Default: 1 + * --writeThroughputRatio + Percentage of total write throughput to write the destination table + Default: 0.0 ``` -2. This produces the target jar in the target/ directory, to start the replication process: - -java -jar dynamodb-import-export-tool.jar - ---destinationEndpoint // the DynamoDB endpoint where the destination table is located. - ---destinationTable // the destination table to write to. - ---sourceEndpoint // the endpoint where the source table is located. - ---sourceTable // the source table to read from. - ---readThroughputRatio // the ratio of read throughput to consume from the source table. - ---writeThroughputRatio // the ratio of write throughput to consume from the destination table. - ---maxWriteThreads // (Optional, default=128 * Available_Processors) Maximum number of write threads to create. - ---totalSections // (Optional, default=1) Total number of sections to split the bootstrap into. Each application will only scan and write one section. - ---section // (Optional, default=0) section to read and write. Only will scan this one section of all sections, [0...totalSections-1]. - ---consistentScan // (Optional, default=false) indicates whether consistent scan should be used when reading from the source table. +2. An example command you can use on one EC2 host to copy from one table `foo` in `us-east-1` to a new table +called `bar` in `us-east-2` follows. + +```bash +java -jar target/dynamodb-import-export-tool-1.1.0.jar \ +--sourceRegion us-east-1 \ +--sourceTable foo \ +--destinationRegion us-east-2 \ +--destinationTable bar \ +--readThroughputRatio 1 \ +--writeThroughputRatio 1 +``` -> **NOTE**: To split the replication process across multiple machines, simply use the totalSections & section command line arguments, where each machine will run one section out of [0 ... totalSections-1]. +> **NOTE**: To split the replication process across multiple machines, simply use the totalSections & section +command line arguments, where each machine will run one section out of [0 ... totalSections-1]. ## Using the API +Find some examples of how to use the Import-Export tool's API below. +The first demonstrates how to use the API to copy data from one DynamoDB table to another. +The second demonstrates how to enqueue the data in a DynamoDB table in a +`BlockingQueueConsumer` in memory. ### 1. Transfer Data from One DynamoDB Table to Another DynamoDB Table -The below example will read from "mySourceTable" at 100 reads per second, using 4 threads. And it will write to "myDestinationTable" at 50 writes per second, using 8 threads. -Both tables are located at "dynamodb.us-west-1.amazonaws.com". (to transfer to a different region, create 2 AmazonDynamoDBClients +The below example will read from "mySourceTable" at 100 reads per second, using four threads. +And it will write to "myDestinationTable" at 50 writes per second, using eight threads. +Both tables are located at "dynamodb.us-west-1.amazonaws.com". +To transfer to a different region, create two AmazonDynamoDBClients with different endpoints to pass into the DynamoDBBootstrapWorker and the DynamoDBConsumer. ```java -AmazonDynamoDBClient client = new AmazonDynamoDBClient(new ProfileCredentialsProvider()); -client.setEndpoint("dynamodb.us-west-1.amazonaws.com"); - -DynamoDBBootstrapWorker worker = null; - -try { - // 100.0 read operations per second. 4 threads to scan the table. - worker = new DynamoDBBootstrapWorker(client, - 100.0, "mySourceTable", 4); -} catch (NullReadCapacityException e) { - LOGGER.error("The DynamoDB source table returned a null read capacity.", e); - System.exit(1); -} - - // 50.0 write operations per second. 8 threads to scan the table. -DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable", 50.0, Executors.newFixedThreadPool(8)); - -try { - worker.pipe(consumer); -} catch (ExecutionException e) { - LOGGER.error("Encountered exception when executing transfer.", e); - System.exit(1); -} catch (InterruptedException e){ - LOGGER.error("Interrupted when executing transfer.", e); - System.exit(1); +import DynamoDBBootstrapWorker; +import com.amazonaws.dynamodb.bootstrap.consumer.DynamoDBConsumer; +import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +class TransferDataFromOneTableToAnother { + public static void main(String[] args) { + final AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() + .withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); + try { + // 100.0 read operations per second. 4 threads to scan the table. + final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(client, + 100.0, "mySourceTable", 4); + // 50.0 write operations per second. 8 threads to scan the table. + final DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable", + 50.0, Executors.newFixedThreadPool(8)); + worker.pipe(consumer); + } catch (NullReadCapacityException e) { + System.err.println("The DynamoDB source table returned a null read capacity."); + System.exit(1); + } catch (ExecutionException | InterruptedException e) { + System.err.println("Encountered exception when executing transfer: " + e.getMessage()); + System.exit(1); + } + } } ``` ### 2. Transfer Data From one DynamoDB Table to a Blocking Queue. -The below example will read from a DynamoDB table and export to an array blocking queue. This is useful for when another application would like to consume -the DynamoDB entries but does not have a setup application for it. They can just retrieve the queue (consumer.getQueue()) and then continually pop() from it +The below example will read from a DynamoDB table and export to an array blocking queue. +This is useful for when another application would like to consume +the DynamoDB entries but does not have a setup application for it. +They can just retrieve the queue (consumer.getQueue()) and then continually pop() from it to then process the new entries. ```java -AmazonDynamoDBClient client = new AmazonDynamoDBClient(new ProfileCredentialsProvider()); -client.setEndpoint("dynamodb.us-west-1.amazonaws.com"); - -DynamoDBBootstrapWorker worker = null; - -try { - // 100.0 read operations per second. 4 threads to scan the table. - worker = new DynamoDBBootstrapWorker(client, - 100.0, "mySourceTable", 4); -} catch (NullReadCapacityException e) { - LOGGER.error("The DynamoDB source table returned a null read capacity.", e); - System.exit(1); -} - -BlockingQueueConsumer consumer = new BlockingQueueConsumer(8); - -try { - worker.pipe(consumer); -} catch (ExecutionException e) { - LOGGER.error("Encountered exception when executing transfer.", e); - System.exit(1); -} catch (InterruptedException e){ - LOGGER.error("Interrupted when executing transfer.", e); - System.exit(1); +import com.amazonaws.dynamodb.bootstrap.consumer.BlockingQueueConsumer; +import DynamoDBBootstrapWorker; +import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; + +import java.util.concurrent.ExecutionException; + +class TransferDataFromOneTableToBlockingQueue { + public static void main(String[] args) { + final AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard() + .withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); + try { + // 100.0 read operations per second. 4 threads to scan the table. + final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(client, 100.0, + "mySourceTable", 4); + final BlockingQueueConsumer consumer = new BlockingQueueConsumer(8); + worker.pipe(consumer); + } catch (NullReadCapacityException e) { + System.err.println("The DynamoDB source table returned a null read capacity."); + System.exit(1); + } catch (ExecutionException | InterruptedException e) { + System.err.println("Encountered exception when executing transfer: " + e.getMessage()); + System.exit(1); + } + } } ``` diff --git a/pom.xml b/pom.xml index a0360bb..eb60c2d 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,7 @@ 4.0.0 com.amazonaws - 1.0.1 + 1.1.0 dynamodb-import-export-tool jar DynamoDB Import Export Tool @@ -11,14 +11,17 @@ https://github.com/awslabs/dynamodb-import-export-tool.git - 1.10.10 - 1.6.2 - 1.48 - 15.0 + 1.8 + 1.11.123 + 1.6.6 + 1.69 + 21.0 1.2.17 - 3.2 + 3.4 1.2 - 2.4.1 + 3.0.0 + 3.0 + 1.6 true @@ -84,6 +87,11 @@ log4j ${log4j.core.version} + + org.projectlombok + lombok + 1.16.14 + org.powermock powermock-module-junit4 @@ -109,14 +117,15 @@ maven-compiler-plugin - 1.7 - 1.7 + ${jdk.version} + ${jdk.version} - 3.0 + ${maven.compiler.version} org.apache.maven.plugins maven-gpg-plugin + ${maven.gpg.version} sign-artifacts diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java index 40c4883..80a03b5 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java @@ -22,6 +22,7 @@ import org.apache.log4j.Logger; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; +import com.amazonaws.dynamodb.bootstrap.consumer.AbstractLogConsumer; /** * Abstract class to send inputs from a source to a consumer. @@ -31,8 +32,7 @@ public abstract class AbstractLogProvider { /** * Logger for the DynamoDBBootstrapWorker. */ - private static final Logger LOGGER = LogManager - .getLogger(AbstractLogProvider.class); + private static final Logger LOGGER = LogManager.getLogger(AbstractLogProvider.class); protected ExecutorService threadPool; @@ -40,16 +40,14 @@ public abstract class AbstractLogProvider { * Begins to read log results and transfer them to the consumer who will * write the results. */ - public abstract void pipe(final AbstractLogConsumer consumer) - throws ExecutionException, InterruptedException; + public abstract void pipe(final AbstractLogConsumer consumer) throws ExecutionException, InterruptedException; /** * Shuts the thread pool down. - * - * @param - * If true, this method waits for the threads in the pool to - * finish. If false, this thread pool shuts down without - * finishing their current tasks. + * + * @param If true, this method waits for the threads in the pool to + * finish. If false, this thread pool shuts down without + * finishing their current tasks. */ public void shutdown(boolean awaitTermination) { if (awaitTermination) { @@ -61,8 +59,7 @@ public void shutdown(boolean awaitTermination) { } } catch (InterruptedException e) { interrupted = true; - LOGGER.warn("Threadpool was interrupted when trying to shutdown: " - + e.getMessage()); + LOGGER.warn("Threadpool was interrupted when trying to shutdown: " + e.getMessage()); } finally { if (interrupted) Thread.currentThread().interrupt(); diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixIn.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixIn.java deleted file mode 100644 index 31c2c6e..0000000 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixIn.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/asl/ - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package com.amazonaws.dynamodb.bootstrap; - -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import com.amazonaws.services.dynamodbv2.model.AttributeValue; -import com.fasterxml.jackson.annotation.JsonProperty; - - -/** - * Mixin for attribute values to stay all capital when mapping them as strings. - * - */ -public abstract class AttributeValueMixIn { - @JsonProperty("S") public abstract String getS(); - @JsonProperty("S") public abstract void setS(String s); - @JsonProperty("N") public abstract String getN(); - @JsonProperty("N") public abstract void setN(String n); - @JsonProperty("B") public abstract ByteBuffer getB(); - @JsonProperty("B") public abstract void setB(ByteBuffer b); - @JsonProperty("NULL") public abstract Boolean isNULL(); - @JsonProperty("NULL") public abstract void setNULL(Boolean nU); - @JsonProperty("BOOL") public abstract Boolean getBOOL(); - @JsonProperty("BOOL") public abstract void setBOOL(Boolean bO); - @JsonProperty("SS") public abstract List getSS(); - @JsonProperty("SS") public abstract void setSS(List sS); - @JsonProperty("NS") public abstract List getNS(); - @JsonProperty("NS") public abstract void setNS(List nS); - @JsonProperty("BS") public abstract List getBS(); - @JsonProperty("BS") public abstract void setBS(List bS); - @JsonProperty("M") public abstract Map getM(); - @JsonProperty("M") public abstract void setM(Map val); - @JsonProperty("L") public abstract List getL(); - @JsonProperty("L") public abstract void setL(List val); -} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java index 1b41a91..66f2c76 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java @@ -14,13 +14,18 @@ */ package com.amazonaws.dynamodb.bootstrap; +import java.util.List; + import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; import com.beust.jcommander.Parameter; +import lombok.Getter; + /** * This class contains the parameters to input when executing the program from * command line. */ +@Getter public class CommandLineArgs { public static final String HELP = "--help"; @Parameter(names = HELP, description = "Display usage information", help = true) @@ -31,82 +36,74 @@ public boolean getHelp() { } public static final String SOURCE_ENDPOINT = "--sourceEndpoint"; - @Parameter(names = SOURCE_ENDPOINT, description = "Endpoint of the source table", required = true) + @Parameter(names = SOURCE_ENDPOINT, description = "Endpoint of the source table") private String sourceEndpoint; - public String getSourceEndpoint() { - return sourceEndpoint; - } + public static final String SOURCE_SIGNING_REGION = "--sourceSigningRegion"; + @Parameter(names = SOURCE_SIGNING_REGION, description = "Signing region for the source endpoint", required = true) + private String sourceSigningRegion; public static final String SOURCE_TABLE = "--sourceTable"; @Parameter(names = SOURCE_TABLE, description = "Name of the source table", required = true) private String sourceTable; - public String getSourceTable() { - return sourceTable; - } - public static final String DESTINATION_ENDPOINT = "--destinationEndpoint"; - @Parameter(names = DESTINATION_ENDPOINT, description = "Endpoint of the destination table", required = true) + @Parameter(names = DESTINATION_ENDPOINT, description = "Endpoint of the destination table") private String destinationEndpoint; - public String getDestinationEndpoint() { - return destinationEndpoint; - } + public static final String DESTINATION_SIGNING_REGION = "--destinationSigningRegion"; + @Parameter(names = DESTINATION_SIGNING_REGION, description = "Signing region for the destination endpoint", required = true) + private String destinationSigningRegion; public static final String DESTINATION_TABLE = "--destinationTable"; @Parameter(names = DESTINATION_TABLE, description = "Name of the destination table", required = true) private String destinationTable; - public String getDestinationTable() { - return destinationTable; - } + public static final String CREATE_DESTINATION_TABLE_IF_MISSING = "--createDestination"; + @Parameter(names = CREATE_DESTINATION_TABLE_IF_MISSING, description = "Create destination table if it does not exist") + private boolean createDestinationTableIfMissing; + + public static final String CREATE_ALL_LSI = "--createAllLsi"; + @Parameter(names = CREATE_ALL_LSI, description = "Create all LSI in destination table") + private boolean createAllLsi; + + public static final String CREATE_ALL_GSI = "--createAllGsi"; + @Parameter(names = CREATE_ALL_GSI, description = "Create all GSI in destination table") + private boolean createAllGsi; + + public static final String INCLUDE_LSI = "--includeLsi"; + @Parameter(names = INCLUDE_LSI, description = "Include the following LSI in the destination table") + private List includeLsi; + + public static final String INCLUDE_GSI = "--includeGsi"; + @Parameter(names = INCLUDE_GSI, description = "Include the following GSI in the destination table") + private List includeGsi; + + public static final String COPY_STREAM_SPECIFICATION_WHEN_CREATING = "--copyStreamSpecificationWhenCreating"; + @Parameter(names = COPY_STREAM_SPECIFICATION_WHEN_CREATING, description = "Use the source table stream specification for the destination table during its creation.") + private boolean copyStreamSpecification; public static final String READ_THROUGHPUT_RATIO = "--readThroughputRatio"; @Parameter(names = READ_THROUGHPUT_RATIO, description = "Percentage of total read throughput to scan the source table", required = true) private double readThroughputRatio; - public double getReadThroughputRatio() { - return readThroughputRatio; - } - public static final String WRITE_THROUGHPUT_RATIO = "--writeThroughputRatio"; @Parameter(names = WRITE_THROUGHPUT_RATIO, description = "Percentage of total write throughput to write the destination table", required = true) private double writeThroughputRatio; - public double getWriteThroughputRatio() { - return writeThroughputRatio; - } - public static final String MAX_WRITE_THREADS = "--maxWriteThreads"; @Parameter(names = MAX_WRITE_THREADS, description = "Number of max threads to write to destination table", required = false) private int maxWriteThreads = BootstrapConstants.DYNAMODB_CLIENT_EXECUTOR_MAX_POOL_SIZE; - public int getMaxWriteThreads() { - return maxWriteThreads; - } - public static final String TOTAL_SECTIONS = "--totalSections"; @Parameter(names = TOTAL_SECTIONS, description = "Total number of sections to divide the scan into", required = false) private int totalSections = 1; - public int getTotalSections() { - return totalSections; - } - public static final String SECTION = "--section"; @Parameter(names = SECTION, description = "Section number to scan when running multiple programs concurrently [0, 1... totalSections-1]", required = false) private int section = 0; - public int getSection() { - return section; - } - public static final String CONSISTENT_SCAN = "--consistentScan"; @Parameter(names = CONSISTENT_SCAN, description = "Use this flag to use strongly consistent scan. If the flag is not used it will default to eventually consistent scan") private boolean consistentScan = false; - - public boolean getConsistentScan() { - return consistentScan; - } } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java index 67639fc..64fc187 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java @@ -14,41 +14,157 @@ */ package com.amazonaws.dynamodb.bootstrap; +import java.util.Optional; +import java.util.SortedSet; +import java.util.TreeSet; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; - +import com.amazonaws.AmazonServiceException; import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; +import com.amazonaws.dynamodb.bootstrap.consumer.DynamoDBConsumer; import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; import com.amazonaws.dynamodb.bootstrap.exception.SectionOutOfRangeException; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.dynamodb.bootstrap.worker.DynamoDBBootstrapWorker; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.dynamodbv2.model.DescribeTableRequest; +import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.amazonaws.waiters.WaiterParameters; +import com.amazonaws.waiters.WaiterTimedOutException; +import com.amazonaws.waiters.WaiterUnrecoverableException; import com.beust.jcommander.JCommander; import com.beust.jcommander.ParameterException; +import com.google.common.base.Preconditions; + +import lombok.NonNull; +import lombok.extern.log4j.Log4j; /** * The interface that parses the arguments, and begins to transfer data from one * DynamoDB table to another */ +@Log4j public class CommandLineInterface { - /** - * Logger for the DynamoDBBootstrapWorker. - */ - private static final Logger LOGGER = LogManager - .getLogger(CommandLineInterface.class); + public static final String ENCOUNTERED_EXCEPTION_WHEN_EXECUTING_TRANSFER = "Encountered exception when executing transfer."; + + static AwsClientBuilder.EndpointConfiguration createEndpointConfiguration(Region region, Optional endpoint, String endpointPrefix) { + return new AwsClientBuilder.EndpointConfiguration(endpoint.orElse("https://" + region.getServiceEndpoint(endpointPrefix)), region.getName()); + } + + @NonNull + private final AwsClientBuilder.EndpointConfiguration sourceEndpointConfiguration; + @NonNull + private final String sourceTable; + @NonNull + private final AwsClientBuilder.EndpointConfiguration destinationEndpointConfiguration; + @NonNull + private final String destinationTable; + private final double readThroughputRatio; + private final double writeThroughputRatio; + private final int maxWriteThreads; + private final boolean isConsistentScan; + private final boolean createDestinationTableIfMissing; + private final boolean createAllGsi; + private final boolean createAllLsi; + @NonNull + private final SortedSet includeGsi; + @NonNull + private final SortedSet includeLsi; + private final boolean copyStreamSpecification; + private final int sectionNumber; + private final int totalSections; + + private CommandLineInterface(final CommandLineArgs params) { + sourceEndpointConfiguration = + createEndpointConfiguration(Region.getRegion(Regions.fromName(params.getSourceSigningRegion())), Optional.ofNullable(params.getSourceEndpoint()), + AmazonDynamoDB.ENDPOINT_PREFIX); + sourceTable = params.getSourceTable(); + destinationEndpointConfiguration = + createEndpointConfiguration(Region.getRegion(Regions.fromName(params.getDestinationSigningRegion())), Optional.ofNullable(params.getDestinationEndpoint()), + AmazonDynamoDB.ENDPOINT_PREFIX); + destinationTable = params.getDestinationTable(); + readThroughputRatio = params.getReadThroughputRatio(); + writeThroughputRatio = params.getWriteThroughputRatio(); + maxWriteThreads = params.getMaxWriteThreads(); + isConsistentScan = params.isConsistentScan(); + createDestinationTableIfMissing = params.isCreateDestinationTableIfMissing(); + createAllGsi = params.isCreateAllGsi(); + createAllLsi = params.isCreateAllLsi(); + includeLsi = new TreeSet<>(params.getIncludeLsi()); + Preconditions.checkArgument(includeLsi.size() == params.getIncludeLsi().size(), "list of LSI names must be unique"); + includeGsi = new TreeSet<>(params.getIncludeGsi()); + Preconditions.checkArgument(includeGsi.size() == params.getIncludeGsi().size(), "list of GSI names must be unique"); + copyStreamSpecification = params.isCopyStreamSpecification(); + sectionNumber = params.getSection(); + totalSections = params.getTotalSections(); + } + + private void bootstrapTable() throws InterruptedException, ExecutionException, SectionOutOfRangeException { + final ClientConfiguration config = new ClientConfiguration().withMaxConnections(BootstrapConstants.MAX_CONN_SIZE); + + final DefaultAWSCredentialsProviderChain credentials = new DefaultAWSCredentialsProviderChain(); + final AmazonDynamoDB sourceClient = + AmazonDynamoDBClientBuilder.standard().withEndpointConfiguration(sourceEndpointConfiguration).withCredentials(credentials).withClientConfiguration(config).build(); + final AmazonDynamoDB destinationClient = + AmazonDynamoDBClientBuilder.standard().withEndpointConfiguration(destinationEndpointConfiguration).withCredentials(credentials).withClientConfiguration(config).build(); + + final TableDescription readTableDescription = sourceClient.describeTable(sourceTable).getTable(); + try { + destinationClient.describeTable(destinationTable).getTable(); + } catch (ResourceNotFoundException e) { + if (!createDestinationTableIfMissing) { + throw new IllegalArgumentException("Destination table " + destinationTable + " did not exist", e); + } + try { + final TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().copyStreamSpecification(copyStreamSpecification).newTableName(destinationTable) + .createAllGsi(createAllGsi).gsiToInclude(includeGsi).createAllLsi(createAllLsi).lsiToInclude(includeLsi).build(); + destinationClient.createTable(converter.apply(readTableDescription)); + destinationClient.waiters().tableExists().run(new WaiterParameters<>(new DescribeTableRequest(destinationTable))); + } catch (WaiterUnrecoverableException | WaiterTimedOutException | AmazonServiceException ase) { + throw new IllegalArgumentException("Unable to create destination table", ase); + } + } + + final TableDescription writeTableDescription = destinationClient.describeTable(destinationTable).getTable(); + + final int numSegments; + try { + numSegments = DynamoDBBootstrapWorker.estimateNumberOfSegments(readTableDescription); + } catch (NullReadCapacityException e) { + throw new IllegalStateException("All tables should have a read capacity set", e); + } + + final double readThroughput = calculateThroughput(readTableDescription, readThroughputRatio, true); + final double writeThroughput = calculateThroughput(writeTableDescription, writeThroughputRatio, false); + + final ExecutorService sourceExec = getThreadPool(numSegments); + final DynamoDBBootstrapWorker worker = + new DynamoDBBootstrapWorker(sourceClient, readThroughput, sourceTable, sourceExec, sectionNumber, totalSections, numSegments, isConsistentScan); + + final ExecutorService destinationExec = getThreadPool(maxWriteThreads); + final DynamoDBConsumer consumer = new DynamoDBConsumer(destinationClient, destinationTable, writeThroughput, destinationExec); + + log.info("Starting transfer."); + worker.pipe(consumer); + log.info("Finished transfer."); + } /** * Main class to begin transferring data from one DynamoDB table to another * DynamoDB table. - * + * * @param args */ public static void main(String[] args) { @@ -59,8 +175,7 @@ public static void main(String[] args) { // parse given arguments cmd.parse(args); } catch (ParameterException e) { - LOGGER.error(e); - JCommander.getConsole().println(e.getMessage()); + log.error(e); cmd.usage(); System.exit(1); } @@ -68,65 +183,21 @@ public static void main(String[] args) { // show usage information if help flag exists if (params.getHelp()) { cmd.usage(); - return; - } - final String sourceEndpoint = params.getSourceEndpoint(); - final String destinationEndpoint = params.getDestinationEndpoint(); - final String destinationTable = params.getDestinationTable(); - final String sourceTable = params.getSourceTable(); - final double readThroughputRatio = params.getReadThroughputRatio(); - final double writeThroughputRatio = params.getWriteThroughputRatio(); - final int maxWriteThreads = params.getMaxWriteThreads(); - final boolean consistentScan = params.getConsistentScan(); - - final ClientConfiguration sourceConfig = new ClientConfiguration().withMaxConnections(BootstrapConstants.MAX_CONN_SIZE); - final ClientConfiguration destinationConfig = new ClientConfiguration().withMaxConnections(BootstrapConstants.MAX_CONN_SIZE); - - final AmazonDynamoDBClient sourceClient = new AmazonDynamoDBClient( - new DefaultAWSCredentialsProviderChain(), sourceConfig); - final AmazonDynamoDBClient destinationClient = new AmazonDynamoDBClient( - new DefaultAWSCredentialsProviderChain(), destinationConfig); - sourceClient.setEndpoint(sourceEndpoint); - destinationClient.setEndpoint(destinationEndpoint); - - TableDescription readTableDescription = sourceClient.describeTable( - sourceTable).getTable(); - TableDescription writeTableDescription = destinationClient - .describeTable(destinationTable).getTable(); - int numSegments = 10; - try { - numSegments = DynamoDBBootstrapWorker - .getNumberOfSegments(readTableDescription); - } catch (NullReadCapacityException e) { - LOGGER.warn("Number of segments not specified - defaulting to " - + numSegments, e); + System.exit(0); } - final double readThroughput = calculateThroughput(readTableDescription, - readThroughputRatio, true); - final double writeThroughput = calculateThroughput( - writeTableDescription, writeThroughputRatio, false); - + final CommandLineInterface cli = new CommandLineInterface(params); try { - ExecutorService sourceExec = getSourceThreadPool(numSegments); - ExecutorService destinationExec = getDestinationThreadPool(maxWriteThreads); - DynamoDBConsumer consumer = new DynamoDBConsumer(destinationClient, - destinationTable, writeThroughput, destinationExec); - - final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker( - sourceClient, readThroughput, sourceTable, sourceExec, - params.getSection(), params.getTotalSections(), numSegments, consistentScan); - - LOGGER.info("Starting transfer..."); - worker.pipe(consumer); - LOGGER.info("Finished Copying Table."); - } catch (ExecutionException e) { - LOGGER.error("Encountered exception when executing transfer.", e); + cli.bootstrapTable(); } catch (InterruptedException e) { - LOGGER.error("Interrupted when executing transfer.", e); + log.error("Interrupted when executing transfer.", e); System.exit(1); } catch (SectionOutOfRangeException e) { - LOGGER.error("Invalid section parameter", e); + log.error("Invalid section parameter", e); + System.exit(1); + } catch (Exception e) { //coalesces ExecutionException + log.error(ENCOUNTERED_EXCEPTION_WHEN_EXECUTING_TRANSFER, e); + System.exit(1); } } @@ -134,47 +205,24 @@ public static void main(String[] args) { * returns the provisioned throughput based on the input ratio and the * specified DynamoDB table provisioned throughput. */ - private static double calculateThroughput( - TableDescription tableDescription, double throughputRatio, - boolean read) { + private static double calculateThroughput(TableDescription tableDescription, double throughputRatio, boolean read) { if (read) { - return tableDescription.getProvisionedThroughput() - .getReadCapacityUnits() * throughputRatio; + return tableDescription.getProvisionedThroughput().getReadCapacityUnits() * throughputRatio; } - return tableDescription.getProvisionedThroughput() - .getWriteCapacityUnits() * throughputRatio; + return tableDescription.getProvisionedThroughput().getWriteCapacityUnits() * throughputRatio; } /** * Returns the thread pool for the destination DynamoDB table. */ - private static ExecutorService getDestinationThreadPool(int maxWriteThreads) { + private static ExecutorService getThreadPool(int maxWriteThreads) { int corePoolSize = BootstrapConstants.DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE; if (corePoolSize > maxWriteThreads) { corePoolSize = maxWriteThreads - 1; } final long keepAlive = BootstrapConstants.DYNAMODB_CLIENT_EXECUTOR_KEEP_ALIVE; - ExecutorService exec = new ThreadPoolExecutor(corePoolSize, - maxWriteThreads, keepAlive, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue(maxWriteThreads), - new ThreadPoolExecutor.CallerRunsPolicy()); - return exec; - } - - /** - * Returns the thread pool for the source DynamoDB table. - */ - private static ExecutorService getSourceThreadPool(int numSegments) { - int corePoolSize = BootstrapConstants.DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE; - if (corePoolSize > numSegments) { - corePoolSize = numSegments - 1; - } - - final long keepAlive = BootstrapConstants.DYNAMODB_CLIENT_EXECUTOR_KEEP_ALIVE; - ExecutorService exec = new ThreadPoolExecutor(corePoolSize, - numSegments, keepAlive, TimeUnit.MILLISECONDS, - new ArrayBlockingQueue(numSegments), - new ThreadPoolExecutor.CallerRunsPolicy()); + ExecutorService exec = new ThreadPoolExecutor(corePoolSize, maxWriteThreads, keepAlive, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(maxWriteThreads), + new ThreadPoolExecutor.CallerRunsPolicy()); return exec; } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java index 7ebe248..1ede697 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java @@ -16,7 +16,8 @@ import java.util.concurrent.Executor; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.dynamodb.bootstrap.worker.ScanSegmentWorker; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.google.common.util.concurrent.RateLimiter; @@ -26,12 +27,12 @@ public class DynamoDBTableScan { private final RateLimiter rateLimiter; - private final AmazonDynamoDBClient client; + private final AmazonDynamoDB client; /** * Initializes the RateLimiter and sets the AmazonDynamoDBClient. */ - public DynamoDBTableScan(double rateLimit, AmazonDynamoDBClient client) { + public DynamoDBTableScan(double rateLimit, AmazonDynamoDB client) { rateLimiter = RateLimiter.create(rateLimit); this.client = client; } @@ -39,19 +40,15 @@ public DynamoDBTableScan(double rateLimit, AmazonDynamoDBClient client) { /** * This function copies a scan request for the number of segments and then * adds those workers to the executor service to begin scanning. - * + * * @param totalSections * @param section - * * @return the parallel scan executor to grab results - * when a segment is finished. + * when a segment is finished. */ - public ParallelScanExecutor getParallelScanCompletionService( - ScanRequest initialRequest, int numSegments, Executor executor, - int section, int totalSections) { + public ParallelScanExecutor getParallelScanCompletionService(ScanRequest initialRequest, int numSegments, Executor executor, int section, int totalSections) { final int segments = Math.max(1, numSegments); - final ParallelScanExecutor completion = new ParallelScanExecutor( - executor, segments); + final ParallelScanExecutor completion = new ParallelScanExecutor(executor, segments); int sectionSize = segments / totalSections; int start = sectionSize * section; @@ -61,22 +58,15 @@ public ParallelScanExecutor getParallelScanCompletionService( } for (int segment = start; segment < end; segment++) { - ScanRequest scanSegment = copyScanRequest(initialRequest) - .withTotalSegments(segments).withSegment(segment); - completion.addWorker(new ScanSegmentWorker(this.client, - this.rateLimiter, scanSegment), segment); + ScanRequest scanSegment = copyScanRequest(initialRequest).withTotalSegments(segments).withSegment(segment); + completion.addWorker(new ScanSegmentWorker(this.client, this.rateLimiter, scanSegment), segment); } return completion; } public ScanRequest copyScanRequest(ScanRequest request) { - return new ScanRequest() - .withTableName(request.getTableName()) - .withTotalSegments(request.getTotalSegments()) - .withSegment(request.getSegment()) - .withReturnConsumedCapacity(request.getReturnConsumedCapacity()) - .withLimit(request.getLimit()) - .withConsistentRead(request.getConsistentRead()); + return new ScanRequest().withTableName(request.getTableName()).withTotalSegments(request.getTotalSegments()).withSegment(request.getSegment()) + .withReturnConsumedCapacity(request.getReturnConsumedCapacity()).withLimit(request.getLimit()).withConsistentRead(request.getConsistentRead()); } } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java index 3424221..01c33a5 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java @@ -15,16 +15,17 @@ package com.amazonaws.dynamodb.bootstrap; import java.util.BitSet; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; -import java.util.concurrent.ExecutionException; + +import com.amazonaws.dynamodb.bootstrap.worker.ScanSegmentWorker; /** * This class executes multiple scan requests on one segment of a table in * series, as a runnable. Instances meant to be used as tasks of the worker * thread pool for parallel scans. - * */ public class ParallelScanExecutor { private final BitSet finished; @@ -44,8 +45,7 @@ public ParallelScanExecutor(Executor executor, int segments) { public void finishSegment(int segment) { synchronized (finished) { if (segment > finished.size()) { - throw new IllegalArgumentException( - "Invalid segment passed to finishSegment"); + throw new IllegalArgumentException("Invalid segment passed to finishSegment"); } finished.set(segment); } @@ -63,15 +63,12 @@ public boolean finished() { /** * This method gets a segmentedScanResult and submits the next scan request * for that segment, if there is one. - * + * * @return the next available ScanResult - * @throws ExecutionException - * if one of the segment pages threw while executing - * @throws InterruptedException - * if one of the segment pages was interrupted while executing. + * @throws ExecutionException if one of the segment pages threw while executing + * @throws InterruptedException if one of the segment pages was interrupted while executing. */ - public SegmentedScanResult grab() throws ExecutionException, - InterruptedException { + public SegmentedScanResult grab() throws ExecutionException, InterruptedException { Future ret = exec.take(); int segment = ret.get().getSegment(); diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java index 6f5b356..cbdfb43 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java @@ -18,7 +18,6 @@ /** * Encapsulates segment number in scan result - * */ public class SegmentedScanResult { private final ScanResult result; diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverter.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverter.java new file mode 100644 index 0000000..5fdfa20 --- /dev/null +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverter.java @@ -0,0 +1,93 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap; + +import java.util.List; +import java.util.SortedSet; +import java.util.function.Function; +import java.util.stream.Collectors; + +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex; +import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndexDescription; +import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndex; +import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndexDescription; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; +import com.amazonaws.services.dynamodbv2.model.TableDescription; + +import lombok.Builder; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; + +/** + * Created by amcp on 2017/04/23. + */ +@RequiredArgsConstructor +@Builder +public class TableDescriptionToCreateTableRequestConverter implements Function { + + @NonNull + private final String newTableName; + private final boolean createAllGsi; + private final boolean createAllLsi; + private final boolean copyStreamSpecification; + @NonNull + private final SortedSet gsiToInclude; + @NonNull + private final SortedSet lsiToInclude; + + static ProvisionedThroughput getProvisionedThroughputFromDescription(ProvisionedThroughputDescription description) { + return new ProvisionedThroughput(description.getReadCapacityUnits(), description.getWriteCapacityUnits()); + } + + private static GlobalSecondaryIndex convertGlobalSecondaryIndexDescription(GlobalSecondaryIndexDescription d) { + return new GlobalSecondaryIndex().withIndexName(d.getIndexName()).withKeySchema(d.getKeySchema()).withProjection(d.getProjection()) + .withProvisionedThroughput(getProvisionedThroughputFromDescription(d.getProvisionedThroughput())); + } + + private static LocalSecondaryIndex convertLocalSecondaryIndexDescription(LocalSecondaryIndexDescription d) { + return new LocalSecondaryIndex().withIndexName(d.getIndexName()).withKeySchema(d.getKeySchema()).withProjection(d.getProjection()); + } + + @Override + public CreateTableRequest apply(TableDescription description) { + final List lsiDesc = description.getLocalSecondaryIndexes(); + final List lsi; + if (lsiDesc == null || (!createAllLsi && lsiToInclude.isEmpty())) { + lsi = null; + } else { + lsi = lsiDesc.stream().filter(l -> createAllLsi || lsiToInclude.contains(l.getIndexName())) + .map(TableDescriptionToCreateTableRequestConverter::convertLocalSecondaryIndexDescription).collect(Collectors.toList()); + } + + final List gsiDesc = description.getGlobalSecondaryIndexes(); + final List gsi; + if (gsiDesc == null || (!createAllGsi && gsiToInclude.isEmpty())) { + gsi = null; + } else { + gsi = gsiDesc.stream().filter(g -> createAllGsi || gsiToInclude.contains(g.getIndexName())) + .map(TableDescriptionToCreateTableRequestConverter::convertGlobalSecondaryIndexDescription).collect(Collectors.toList()); + } + + ProvisionedThroughput pt = getProvisionedThroughputFromDescription(description.getProvisionedThroughput()); + CreateTableRequest ctr = new CreateTableRequest().withTableName(newTableName).withProvisionedThroughput(pt).withAttributeDefinitions(description.getAttributeDefinitions()) + .withKeySchema(description.getKeySchema()).withGlobalSecondaryIndexes(gsi).withLocalSecondaryIndexes(lsi); + if (copyStreamSpecification) { + ctr.withStreamSpecification(description.getStreamSpecification()); + } + return ctr; + } +} diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java index abcc53f..37e1dbe 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java @@ -34,14 +34,12 @@ public class BootstrapConstants { /** * Max ThreadPool size for the ExecutorService to use. */ - public static final int DYNAMODB_CLIENT_EXECUTOR_MAX_POOL_SIZE = Runtime - .getRuntime().availableProcessors() * 128; + public static final int DYNAMODB_CLIENT_EXECUTOR_MAX_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 128; /** * Core pool size of a default thread pool. */ - public static final int DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE = Runtime - .getRuntime().availableProcessors() * 4; + public static final int DYNAMODB_CLIENT_EXECUTOR_CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 4; /** * Amount of time in milliseconds to keep the ExecutorService alive for @@ -89,12 +87,12 @@ public class BootstrapConstants { * Max number of bytes in a DynamoDB number attribute. */ public static final int MAX_NUMBER_OF_BYTES_FOR_NUMBER = 21; - + /** * Number of bytes for an item being read with strongly consistent reads */ public static final int STRONGLY_CONSISTENT_READ_ITEM_SIZE = 4 * 1024; - + /** * Number of bytes for an item being read with eventually consistent reads */ diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogConsumer.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/AbstractLogConsumer.java similarity index 78% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogConsumer.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/AbstractLogConsumer.java index 960a59d..299ac98 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogConsumer.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/AbstractLogConsumer.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.consumer; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; @@ -22,6 +22,7 @@ import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; /** @@ -36,26 +37,23 @@ public abstract class AbstractLogConsumer { /** * Logger for the DynamoDBBootstrapWorker. */ - private static final Logger LOGGER = LogManager - .getLogger(AbstractLogConsumer.class); + private static final Logger LOGGER = LogManager.getLogger(AbstractLogConsumer.class); /** * Writes the result of a scan to another endpoint asynchronously. Will call * getWorker to determine what job to submit with the result. - * - * @param - * the SegmentedScanResult to asynchronously write to another - * endpoint. + * + * @param the SegmentedScanResult to asynchronously write to another + * endpoint. */ public abstract Future writeResult(SegmentedScanResult result); /** * Shuts the thread pool down. - * - * @param - * If true, this method waits for the threads in the pool to - * finish. If false, this thread pool shuts down without - * finishing their current tasks. + * + * @param If true, this method waits for the threads in the pool to + * finish. If false, this thread pool shuts down without + * finishing their current tasks. */ public void shutdown(boolean awaitTermination) { if (awaitTermination) { @@ -67,8 +65,7 @@ public void shutdown(boolean awaitTermination) { } } catch (InterruptedException e) { interrupted = true; - LOGGER.warn("Threadpool was interrupted when trying to shutdown: " - + e.getMessage()); + LOGGER.warn("Threadpool was interrupted when trying to shutdown: " + e.getMessage()); } finally { if (interrupted) Thread.currentThread().interrupt(); diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumer.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumer.java similarity index 89% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumer.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumer.java index 11cb995..d7eb255 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumer.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumer.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.consumer; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -20,7 +20,9 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; -import com.amazonaws.dynamodb.bootstrap.DynamoDBEntryWithSize; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; +import com.amazonaws.dynamodb.bootstrap.items.DynamoDBEntryWithSize; +import com.amazonaws.dynamodb.bootstrap.worker.BlockingQueueWorker; /** * This class implements ILogConsumer, and when called to writeResult, it will @@ -48,8 +50,7 @@ public Future writeResult(SegmentedScanResult result) { try { jobSubmission = exec.submit(new BlockingQueueWorker(queue, result)); } catch (NullPointerException npe) { - throw new NullPointerException( - "Thread pool not initialized for LogStashExecutor"); + throw new NullPointerException("Thread pool not initialized for LogStashExecutor"); } return jobSubmission; } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumer.java similarity index 78% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumer.java index a5bfa6c..e75b6e1 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumer.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.consumer; import java.util.Iterator; import java.util.LinkedList; @@ -22,8 +22,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.dynamodb.bootstrap.worker.DynamoDBConsumerWorker; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; import com.amazonaws.services.dynamodbv2.model.PutRequest; @@ -38,15 +40,14 @@ */ public class DynamoDBConsumer extends AbstractLogConsumer { - private final AmazonDynamoDBClient client; + private final AmazonDynamoDB client; private final String tableName; private final RateLimiter rateLimiter; /** * Class to consume logs and write them to a DynamoDB table. */ - public DynamoDBConsumer(AmazonDynamoDBClient client, String tableName, - double rateLimit, ExecutorService exec) { + public DynamoDBConsumer(AmazonDynamoDB client, String tableName, double rateLimit, ExecutorService exec) { this.client = client; this.tableName = tableName; this.rateLimiter = RateLimiter.create(rateLimit); @@ -62,17 +63,13 @@ public DynamoDBConsumer(AmazonDynamoDBClient client, String tableName, @Override public Future writeResult(SegmentedScanResult result) { Future jobSubmission = null; - List batches = splitResultIntoBatches( - result.getScanResult(), tableName); + List batches = splitResultIntoBatches(result.getScanResult(), tableName); Iterator batchesIterator = batches.iterator(); while (batchesIterator.hasNext()) { try { - jobSubmission = exec - .submit(new DynamoDBConsumerWorker(batchesIterator - .next(), client, rateLimiter, tableName)); + jobSubmission = exec.submit(new DynamoDBConsumerWorker(batchesIterator.next(), client, rateLimiter, tableName)); } catch (NullPointerException npe) { - throw new NullPointerException( - "Thread pool not initialized for LogStashExecutor"); + throw new NullPointerException("Thread pool not initialized for LogStashExecutor"); } } return jobSubmission; @@ -82,13 +79,11 @@ public Future writeResult(SegmentedScanResult result) { * Splits up a ScanResult into a list of BatchWriteItemRequests of size 25 * items or less each. */ - public static List splitResultIntoBatches( - ScanResult result, String tableName) { + public static List splitResultIntoBatches(ScanResult result, String tableName) { List batches = new LinkedList(); Iterator> it = result.getItems().iterator(); - BatchWriteItemRequest req = new BatchWriteItemRequest() - .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); + BatchWriteItemRequest req = new BatchWriteItemRequest().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); List writeRequests = new LinkedList(); int i = 0; while (it.hasNext()) { @@ -99,8 +94,7 @@ public static List splitResultIntoBatches( if (i == BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM) { req.addRequestItemsEntry(tableName, writeRequests); batches.add(req); - req = new BatchWriteItemRequest() - .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); + req = new BatchWriteItemRequest().withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL); writeRequests = new LinkedList(); i = 0; } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixIn.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixIn.java new file mode 100644 index 0000000..10243c2 --- /dev/null +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixIn.java @@ -0,0 +1,88 @@ +/* + * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap.items; + +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; + +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.fasterxml.jackson.annotation.JsonProperty; + + +/** + * Mixin for attribute values to stay all capital when mapping them as strings. + */ +public abstract class AttributeValueMixIn { + @JsonProperty("S") + public abstract String getS(); + + @JsonProperty("S") + public abstract void setS(String s); + + @JsonProperty("N") + public abstract String getN(); + + @JsonProperty("N") + public abstract void setN(String n); + + @JsonProperty("B") + public abstract ByteBuffer getB(); + + @JsonProperty("B") + public abstract void setB(ByteBuffer b); + + @JsonProperty("NULL") + public abstract Boolean isNULL(); + + @JsonProperty("NULL") + public abstract void setNULL(Boolean nU); + + @JsonProperty("BOOL") + public abstract Boolean getBOOL(); + + @JsonProperty("BOOL") + public abstract void setBOOL(Boolean bO); + + @JsonProperty("SS") + public abstract List getSS(); + + @JsonProperty("SS") + public abstract void setSS(List sS); + + @JsonProperty("NS") + public abstract List getNS(); + + @JsonProperty("NS") + public abstract void setNS(List nS); + + @JsonProperty("BS") + public abstract List getBS(); + + @JsonProperty("BS") + public abstract void setBS(List bS); + + @JsonProperty("M") + public abstract Map getM(); + + @JsonProperty("M") + public abstract void setM(Map val); + + @JsonProperty("L") + public abstract List getL(); + + @JsonProperty("L") + public abstract void setL(List val); +} \ No newline at end of file diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBEntryWithSize.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/DynamoDBEntryWithSize.java similarity index 95% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBEntryWithSize.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/items/DynamoDBEntryWithSize.java index 98ee9ea..53e69cc 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBEntryWithSize.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/DynamoDBEntryWithSize.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.items; import java.util.Map; diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/ItemSizeCalculator.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/ItemSizeCalculator.java similarity index 96% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/ItemSizeCalculator.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/items/ItemSizeCalculator.java index c965548..a9af6fe 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/ItemSizeCalculator.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/items/ItemSizeCalculator.java @@ -12,7 +12,7 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.items; import java.nio.ByteBuffer; import java.util.Iterator; @@ -25,7 +25,6 @@ /** * Class used to calculate the size of a DynamoDB item in bytes. - * */ public class ItemSizeCalculator { @@ -47,17 +46,19 @@ public static int calculateItemSizeInBytes(Map item) { } return size; } - + public static int calculateScanResultSizeInBytes(ScanResult result) { final Iterator> it = result.getItems().iterator(); int totalBytes = 0; - while(it.hasNext()){ + while (it.hasNext()) { totalBytes += calculateItemSizeInBytes(it.next()); } return totalBytes; } - /** Calculate attribute value size */ + /** + * Calculate attribute value size + */ private static int calculateAttributeSizeInBytes(AttributeValue value) { int attrValSize = 0; if (value == null) { @@ -98,8 +99,7 @@ private static int calculateAttributeSizeInBytes(AttributeValue value) { } else if (value.getNULL() != null) { attrValSize += 1; } else if (value.getM() != null) { - for (Map.Entry entry : value.getM() - .entrySet()) { + for (Map.Entry entry : value.getM().entrySet()) { attrValSize += entry.getKey().getBytes(BootstrapConstants.UTF8).length; attrValSize += calculateAttributeSizeInBytes(entry.getValue()); attrValSize += BootstrapConstants.BASE_LOGICAL_SIZE_OF_NESTED_TYPES; diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorker.java similarity index 82% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorker.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorker.java index b819042..19fa175 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorker.java @@ -12,17 +12,20 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; +import com.amazonaws.dynamodb.bootstrap.items.DynamoDBEntryWithSize; +import com.amazonaws.dynamodb.bootstrap.items.ItemSizeCalculator; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.ScanResult; @@ -36,14 +39,12 @@ public class BlockingQueueWorker implements Callable { /** * Logger for the LogStashQueueWorker. */ - private static final Logger LOGGER = LogManager - .getLogger(BlockingQueueWorker.class); + private static final Logger LOGGER = LogManager.getLogger(BlockingQueueWorker.class); private final BlockingQueue queue; private final SegmentedScanResult result; - public BlockingQueueWorker(BlockingQueue queue, - SegmentedScanResult result) { + public BlockingQueueWorker(BlockingQueue queue, SegmentedScanResult result) { this.queue = queue; this.result = result; } @@ -58,14 +59,11 @@ public Void call() { do { try { Map item = it.next(); - DynamoDBEntryWithSize entryWithSize = new DynamoDBEntryWithSize( - item, - ItemSizeCalculator.calculateItemSizeInBytes(item)); + DynamoDBEntryWithSize entryWithSize = new DynamoDBEntryWithSize(item, ItemSizeCalculator.calculateItemSizeInBytes(item)); queue.put(entryWithSize); } catch (InterruptedException e) { interrupted = true; - LOGGER.warn("interrupted when writing item to queue: " - + e.getMessage()); + LOGGER.warn("interrupted when writing item to queue: " + e.getMessage()); } } while (it.hasNext()); } finally { diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorker.java similarity index 68% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorker.java index 10278ef..a74bb58 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorker.java @@ -12,16 +12,21 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.amazonaws.dynamodb.bootstrap.AbstractLogProvider; +import com.amazonaws.dynamodb.bootstrap.DynamoDBTableScan; +import com.amazonaws.dynamodb.bootstrap.ParallelScanExecutor; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; +import com.amazonaws.dynamodb.bootstrap.consumer.AbstractLogConsumer; import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; import com.amazonaws.dynamodb.bootstrap.exception.SectionOutOfRangeException; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; import com.amazonaws.services.dynamodbv2.model.ReturnConsumedCapacity; import com.amazonaws.services.dynamodbv2.model.ScanRequest; @@ -32,7 +37,7 @@ * consumer to accept the results. */ public class DynamoDBBootstrapWorker extends AbstractLogProvider { - private final AmazonDynamoDBClient client; + private final AmazonDynamoDB client; private final double rateLimit; private final String tableName; private final int numSegments; @@ -43,16 +48,13 @@ public class DynamoDBBootstrapWorker extends AbstractLogProvider { /** * Creates the DynamoDBBootstrapWorker, calculates the number of segments a * table should have, and creates a thread pool to prepare to scan. - * + * * @throws Exception */ - public DynamoDBBootstrapWorker(AmazonDynamoDBClient client, - double rateLimit, String tableName, ExecutorService exec, - int section, int totalSections, int numSegments, - boolean consistentScan) throws SectionOutOfRangeException { + public DynamoDBBootstrapWorker(AmazonDynamoDB client, double rateLimit, String tableName, ExecutorService exec, int section, int totalSections, int numSegments, + boolean consistentScan) throws SectionOutOfRangeException { if (section > totalSections - 1 || section < 0) { - throw new SectionOutOfRangeException( - "Section of scan must be within [0...totalSections-1]"); + throw new SectionOutOfRangeException("Section of scan must be within [0...totalSections-1]"); } this.client = client; @@ -71,22 +73,19 @@ public DynamoDBBootstrapWorker(AmazonDynamoDBClient client, * Creates the DynamoDBBootstrapWorker, calculates the number of segments a * table should have, and creates a thread pool to prepare to scan using an * eventually consistent scan. - * + * * @throws Exception */ - public DynamoDBBootstrapWorker(AmazonDynamoDBClient client, - double rateLimit, String tableName, int numThreads) - throws NullReadCapacityException { + public DynamoDBBootstrapWorker(AmazonDynamoDB client, double rateLimit, String tableName, int numThreads) throws NullReadCapacityException { this.client = client; this.rateLimit = rateLimit; this.tableName = tableName; - TableDescription description = client.describeTable(tableName) - .getTable(); + TableDescription description = client.describeTable(tableName).getTable(); this.section = 0; this.totalSections = 1; this.consistentScan = false; - this.numSegments = getNumberOfSegments(description); + this.numSegments = estimateNumberOfSegments(description); int numProcessors = Runtime.getRuntime().availableProcessors() * 4; if (numProcessors > numThreads) { numThreads = numProcessors; @@ -98,19 +97,13 @@ public DynamoDBBootstrapWorker(AmazonDynamoDBClient client, * Begins to pipe the log results by parallel scanning the table and the * consumer writing the results. */ - public void pipe(final AbstractLogConsumer consumer) - throws ExecutionException, InterruptedException { - final DynamoDBTableScan scanner = new DynamoDBTableScan(rateLimit, - client); + public void pipe(final AbstractLogConsumer consumer) throws ExecutionException, InterruptedException { + final DynamoDBTableScan scanner = new DynamoDBTableScan(rateLimit, client); - final ScanRequest request = new ScanRequest().withTableName(tableName) - .withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL) - .withLimit(BootstrapConstants.SCAN_LIMIT) - .withConsistentRead(consistentScan); + final ScanRequest request = new ScanRequest().withTableName(tableName).withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL).withLimit(BootstrapConstants.SCAN_LIMIT) + .withConsistentRead(consistentScan); - final ParallelScanExecutor scanService = scanner - .getParallelScanCompletionService(request, numSegments, - threadPool, section, totalSections); + final ParallelScanExecutor scanService = scanner.getParallelScanCompletionService(request, numSegments, threadPool, section, totalSections); while (!scanService.finished()) { SegmentedScanResult result = scanService.grab(); @@ -128,28 +121,22 @@ public void pipe(final AbstractLogConsumer consumer) * table, which should need many more segments in order to scan the table * fast enough in parallel so that one worker does not finish long before * other workers. - * - * @throws NullReadCapacityException - * if the table returns a null readCapacity units. + * + * @throws NullReadCapacityException if the table returns a null readCapacity units. */ - public static int getNumberOfSegments(TableDescription description) - throws NullReadCapacityException { - ProvisionedThroughputDescription provisionedThroughput = description - .getProvisionedThroughput(); - double tableSizeInGigabytes = Math.ceil(description.getTableSizeBytes() - / BootstrapConstants.GIGABYTE); + public static int estimateNumberOfSegments(TableDescription description) throws NullReadCapacityException { + ProvisionedThroughputDescription provisionedThroughput = description.getProvisionedThroughput(); + double tableSizeInGigabytes = Math.ceil(description.getTableSizeBytes() / BootstrapConstants.GIGABYTE); Long readCapacity = provisionedThroughput.getReadCapacityUnits(); Long writeCapacity = provisionedThroughput.getWriteCapacityUnits(); if (writeCapacity == null) { writeCapacity = 1L; } if (readCapacity == null) { - throw new NullReadCapacityException( - "Cannot scan with a null readCapacity provisioned throughput"); + throw new NullReadCapacityException("Cannot scan with a null readCapacity provisioned throughput"); } double throughput = (readCapacity + 3 * writeCapacity) / 3000.0; - return (int) (10 * Math.max(Math.ceil(throughput), - Math.ceil(tableSizeInGigabytes) / 10)); + return 10 * ((int) Math.max(Math.ceil(throughput), Math.ceil(tableSizeInGigabytes) / 10)); } } diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBConsumerWorker.java similarity index 91% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerWorker.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBConsumerWorker.java index 432ddc9..9e43e89 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBConsumerWorker.java @@ -12,16 +12,16 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; import java.util.Iterator; -import java.util.List; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; import com.amazonaws.services.dynamodbv2.model.BatchWriteItemResult; import com.amazonaws.services.dynamodbv2.model.ConsumedCapacity; @@ -33,7 +33,7 @@ */ public class DynamoDBConsumerWorker implements Callable { - private final AmazonDynamoDBClient client; + private final AmazonDynamoDB client; private final RateLimiter rateLimiter; private long exponentialBackoffTime; private BatchWriteItemRequest batch; @@ -44,9 +44,7 @@ public class DynamoDBConsumerWorker implements Callable { * table. If the write returns unprocessed items it will exponentially back * off until it succeeds. */ - public DynamoDBConsumerWorker(BatchWriteItemRequest batchWriteItemRequest, - AmazonDynamoDBClient client, RateLimiter rateLimiter, - String tableName) { + public DynamoDBConsumerWorker(BatchWriteItemRequest batchWriteItemRequest, AmazonDynamoDB client, RateLimiter rateLimiter, String tableName) { this.batch = batchWriteItemRequest; this.client = client; this.rateLimiter = rateLimiter; @@ -84,8 +82,7 @@ public List runWithBackoff(BatchWriteItemRequest req) { do { writeItemResult = client.batchWriteItem(req); unprocessedItems = writeItemResult.getUnprocessedItems(); - consumedCapacities - .addAll(writeItemResult.getConsumedCapacity()); + consumedCapacities.addAll(writeItemResult.getConsumedCapacity()); if (unprocessedItems != null) { req.setRequestItems(unprocessedItems); diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/ScanSegmentWorker.java similarity index 81% rename from src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java rename to src/main/java/com/amazonaws/dynamodb/bootstrap/worker/ScanSegmentWorker.java index c496e75..dce9f46 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/worker/ScanSegmentWorker.java @@ -12,12 +12,14 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; import java.util.concurrent.Callable; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; -import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.dynamodb.bootstrap.items.ItemSizeCalculator; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.model.ConsumedCapacity; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.amazonaws.services.dynamodbv2.model.ScanResult; @@ -27,18 +29,16 @@ * This class executes multiple scan requests on one segment of a table in * series, as a runnable. Instances meant to be used as tasks of the worker * thread pool for parallel scans. - * */ public class ScanSegmentWorker implements Callable { private final ScanRequest request; private boolean hasNext; private int lastConsumedCapacity; private long exponentialBackoffTime; - private final AmazonDynamoDBClient client; + private final AmazonDynamoDB client; private final RateLimiter rateLimiter; - ScanSegmentWorker(final AmazonDynamoDBClient client, - final RateLimiter rateLimiter, ScanRequest request) { + public ScanSegmentWorker(final AmazonDynamoDB client, final RateLimiter rateLimiter, ScanRequest request) { this.request = request; this.client = client; this.rateLimiter = rateLimiter; @@ -59,20 +59,16 @@ public SegmentedScanResult call() { final ConsumedCapacity cc = result.getConsumedCapacity(); if (cc != null && cc.getCapacityUnits() != null) { - lastConsumedCapacity = result.getConsumedCapacity() - .getCapacityUnits().intValue(); + lastConsumedCapacity = result.getConsumedCapacity().getCapacityUnits().intValue(); } else if (result.getScannedCount() != null && result.getCount() != null) { final boolean isConsistent = request.getConsistentRead(); - int itemSize = isConsistent ? BootstrapConstants.STRONGLY_CONSISTENT_READ_ITEM_SIZE - : BootstrapConstants.EVENTUALLY_CONSISTENT_READ_ITEM_SIZE; + int itemSize = isConsistent ? BootstrapConstants.STRONGLY_CONSISTENT_READ_ITEM_SIZE : BootstrapConstants.EVENTUALLY_CONSISTENT_READ_ITEM_SIZE; - lastConsumedCapacity = (result.getScannedCount() / (int) Math.max(1.0, result.getCount())) - * (ItemSizeCalculator.calculateScanResultSizeInBytes(result) / itemSize); + lastConsumedCapacity = (result.getScannedCount() / (int) Math.max(1.0, result.getCount())) * (ItemSizeCalculator.calculateScanResultSizeInBytes(result) / itemSize); } - if (result.getLastEvaluatedKey() != null - && !result.getLastEvaluatedKey().isEmpty()) { + if (result.getLastEvaluatedKey() != null && !result.getLastEvaluatedKey().isEmpty()) { hasNext = true; request.setExclusiveStartKey(result.getLastEvaluatedKey()); } else { diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterfaceTests.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterfaceTests.java new file mode 100644 index 0000000..4ef152b --- /dev/null +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterfaceTests.java @@ -0,0 +1,21 @@ +/* + * Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap; + +/** + * Created by amcp on 2017/04/23. + */ +public class CommandLineInterfaceTests { +} diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java index 2572606..7f1940f 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java @@ -31,24 +31,23 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import com.amazonaws.dynamodb.bootstrap.worker.ScanSegmentWorker; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; import com.amazonaws.services.dynamodbv2.model.ScanRequest; import com.google.common.util.concurrent.RateLimiter; /** * Unit Tests for DynamoDBTableScan - * */ @RunWith(PowerMockRunner.class) -@PrepareForTest({ RateLimiter.class, DynamoDBTableScan.class }) +@PrepareForTest({RateLimiter.class, DynamoDBTableScan.class}) @PowerMockIgnore("javax.management.*") public class DynamoDBTableScanTest { private static String tableName = "testTableName"; private static Integer totalSegments = 1; private static Integer segment = 0; - private static ScanRequest req = new ScanRequest().withTableName(tableName) - .withTotalSegments(totalSegments).withSegment(segment); + private static ScanRequest req = new ScanRequest().withTableName(tableName).withTotalSegments(totalSegments).withSegment(segment); private double rateLimit = 12.3; /** @@ -56,8 +55,7 @@ public class DynamoDBTableScanTest { * make sure it creates the correct number of segments */ @Test - public void testGetParallelExecutorCompletionServiceWithVariousNumberOfSegments() - throws Exception { + public void testGetParallelExecutorCompletionServiceWithVariousNumberOfSegments() throws Exception { int segments = 0; ExecutorService mockExec = createMock(ExecutorService.class); mockStatic(RateLimiter.class); @@ -70,22 +68,16 @@ public void testGetParallelExecutorCompletionServiceWithVariousNumberOfSegments( ParallelScanExecutor mockScanExecutor = createMock(ParallelScanExecutor.class); ScanSegmentWorker mockSegmentWorker = createMock(ScanSegmentWorker.class); - expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, req) - .andReturn(mockSegmentWorker); - expectNew(ParallelScanExecutor.class, mockExec, 1).andReturn( - mockScanExecutor); + expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, req).andReturn(mockSegmentWorker); + expectNew(ParallelScanExecutor.class, mockExec, 1).andReturn(mockScanExecutor); mockScanExecutor.addWorker(mockSegmentWorker, 0); int segments2 = 3; - ScanRequest testReq = scanner.copyScanRequest(req).withTotalSegments( - segments2); - expectNew(ParallelScanExecutor.class, mockExec, segments2).andReturn( - mockScanExecutor); + ScanRequest testReq = scanner.copyScanRequest(req).withTotalSegments(segments2); + expectNew(ParallelScanExecutor.class, mockExec, segments2).andReturn(mockScanExecutor); for (int i = 0; i < segments2; i++) { - expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, - scanner.copyScanRequest(testReq).withSegment(i)).andReturn( - mockSegmentWorker); + expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, scanner.copyScanRequest(testReq).withSegment(i)).andReturn(mockSegmentWorker); mockScanExecutor.addWorker(mockSegmentWorker, i); } diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResultTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResultTest.java index f11b209..ebf9896 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResultTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResultTest.java @@ -14,11 +14,11 @@ */ package com.amazonaws.dynamodb.bootstrap; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; import org.junit.Test; -import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.services.dynamodbv2.model.ScanResult; /** @@ -33,8 +33,7 @@ public class SegmentedScanResultTest { public void test() { ScanResult result = new ScanResult(); int numSegments = 3; - SegmentedScanResult segmentedScanResult = new SegmentedScanResult( - result, numSegments); + SegmentedScanResult segmentedScanResult = new SegmentedScanResult(result, numSegments); assertSame(result, segmentedScanResult.getScanResult()); assertEquals(numSegments, segmentedScanResult.getSegment()); diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverterTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverterTest.java new file mode 100644 index 0000000..7b53ea1 --- /dev/null +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/TableDescriptionToCreateTableRequestConverterTest.java @@ -0,0 +1,222 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.List; +import java.util.SortedSet; +import java.util.TreeSet; + +import org.junit.Test; + +import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; +import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; +import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndexDescription; +import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; +import com.amazonaws.services.dynamodbv2.model.KeyType; +import com.amazonaws.services.dynamodbv2.model.LocalSecondaryIndexDescription; +import com.amazonaws.services.dynamodbv2.model.Projection; +import com.amazonaws.services.dynamodbv2.model.ProjectionType; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; +import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription; +import com.amazonaws.services.dynamodbv2.model.ScalarAttributeType; +import com.amazonaws.services.dynamodbv2.model.StreamSpecification; +import com.amazonaws.services.dynamodbv2.model.StreamViewType; +import com.amazonaws.services.dynamodbv2.model.TableDescription; +import com.google.common.collect.Lists; + +/** + * Created by amcp on 2017/04/23. + */ +public class TableDescriptionToCreateTableRequestConverterTest { + + public static final String ORDER_ID = "order_id"; + public static final String CUSTOMER_ID = "customer_id"; + public static final String INPUT_TABLE = "InputTable"; + public static final String OUTPUT_TABLE = "OutputTable"; + public static final ProvisionedThroughput DEFAULT_PT = new ProvisionedThroughput(1L, 4L); + + + static class NoLsiNoGsi { + static final List attributeDefinitionList = + Lists.newArrayList(new AttributeDefinition(ORDER_ID, ScalarAttributeType.S), new AttributeDefinition(CUSTOMER_ID, ScalarAttributeType.S)); + static final List keySchemata = Lists.newArrayList(new KeySchemaElement(CUSTOMER_ID, KeyType.HASH), new KeySchemaElement(ORDER_ID, KeyType.RANGE)); + static final ProvisionedThroughputDescription pt = new ProvisionedThroughputDescription().withReadCapacityUnits(1L).withWriteCapacityUnits(4L); + static final TableDescription description = + new TableDescription().withAttributeDefinitions(NoLsiNoGsi.attributeDefinitionList).withKeySchema(NoLsiNoGsi.keySchemata).withProvisionedThroughput(NoLsiNoGsi.pt) + .withTableName(INPUT_TABLE); + } + + @Test + public void apply_whenNoGsiButIncludeAllGsi_andNoLsiButIncludeAllLsi_andNoStreamButIncludeStream() { + TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().gsiToInclude(new TreeSet<>()).lsiToInclude(new TreeSet<>()).newTableName(OUTPUT_TABLE).createAllGsi(true) + .createAllLsi(true).copyStreamSpecification(true).build(); + CreateTableRequest ctr = converter.apply(NoLsiNoGsi.description); + assertEquals(OUTPUT_TABLE, ctr.getTableName()); + assertEquals(DEFAULT_PT, ctr.getProvisionedThroughput()); + assertEquals(NoLsiNoGsi.attributeDefinitionList, NoLsiNoGsi.description.getAttributeDefinitions()); + assertEquals(NoLsiNoGsi.keySchemata, NoLsiNoGsi.description.getKeySchema()); + assertNull(ctr.getLocalSecondaryIndexes()); + assertNull(ctr.getGlobalSecondaryIndexes()); + assertNull(ctr.getStreamSpecification()); + } + + @Test + public void apply_whenNoGsiButDontIncludeAllGsi_andNoLsiButDontIncludeAllLsi_andNoStreamButDontIncludeStream() { + TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().gsiToInclude(new TreeSet<>()).lsiToInclude(new TreeSet<>()).newTableName(OUTPUT_TABLE).createAllGsi(false) + .createAllLsi(false).copyStreamSpecification(false).build(); + CreateTableRequest ctr = converter.apply(NoLsiNoGsi.description); + assertEquals(OUTPUT_TABLE, ctr.getTableName()); + assertEquals(DEFAULT_PT, ctr.getProvisionedThroughput()); + assertEquals(NoLsiNoGsi.attributeDefinitionList, NoLsiNoGsi.description.getAttributeDefinitions()); + assertEquals(NoLsiNoGsi.keySchemata, NoLsiNoGsi.description.getKeySchema()); + assertNull(ctr.getLocalSecondaryIndexes()); + assertNull(ctr.getGlobalSecondaryIndexes()); + assertNull(ctr.getStreamSpecification()); + } + + private static final String GSI_NAME_ONE = "gsi1"; + private static final String GSI_NAME_TWO = "gsi2"; + private static final String LSI_NAME_ONE = "lsi1"; + private static final String LSI_NAME_TWO = "lsi2"; + private static final String ORDER_TS_MILLIS = "order_ts_millis"; + private static final String ORDER_DATE = "order_date"; + + + static class TwoLsiTwoGsiStream { + static final List attributeDefinitionList = Lists + .newArrayList(new AttributeDefinition(ORDER_ID, ScalarAttributeType.S), new AttributeDefinition(CUSTOMER_ID, ScalarAttributeType.S), + new AttributeDefinition(ORDER_DATE, ScalarAttributeType.N), new AttributeDefinition(ORDER_TS_MILLIS, ScalarAttributeType.N)); + static final List baseKeySchema = Lists.newArrayList(new KeySchemaElement(CUSTOMER_ID, KeyType.HASH), new KeySchemaElement(ORDER_ID, KeyType.RANGE)); + static final List gsiKeySchema = Lists.newArrayList(new KeySchemaElement(ORDER_DATE, KeyType.HASH), new KeySchemaElement(ORDER_TS_MILLIS, KeyType.RANGE)); + static final List lsiKeySchema = + Lists.newArrayList(new KeySchemaElement(CUSTOMER_ID, KeyType.HASH), new KeySchemaElement(ORDER_TS_MILLIS, KeyType.RANGE)); + static final ProvisionedThroughputDescription pt = new ProvisionedThroughputDescription().withReadCapacityUnits(1L).withWriteCapacityUnits(4L); + static final StreamSpecification streamSpecification = new StreamSpecification().withStreamEnabled(true).withStreamViewType(StreamViewType.NEW_AND_OLD_IMAGES); + static final TableDescription description = new TableDescription().withAttributeDefinitions(attributeDefinitionList).withKeySchema(baseKeySchema) + .withGlobalSecondaryIndexes(new GlobalSecondaryIndexDescription().withIndexName(GSI_NAME_ONE).withKeySchema(gsiKeySchema).withProvisionedThroughput(pt) + .withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY)), + new GlobalSecondaryIndexDescription().withIndexName(GSI_NAME_TWO).withKeySchema(gsiKeySchema).withProvisionedThroughput(pt) + .withProjection(new Projection().withProjectionType(ProjectionType.ALL))).withLocalSecondaryIndexes( + new LocalSecondaryIndexDescription().withIndexName(LSI_NAME_ONE).withKeySchema(lsiKeySchema) + .withProjection(new Projection().withProjectionType(ProjectionType.KEYS_ONLY)), + new LocalSecondaryIndexDescription().withIndexName(LSI_NAME_TWO).withKeySchema(lsiKeySchema) + .withProjection(new Projection().withProjectionType(ProjectionType.ALL))).withStreamSpecification(streamSpecification).withProvisionedThroughput(pt) + .withTableName(INPUT_TABLE); + } + + @Test + public void apply_whenTwoGsiAndIncludeAllGsi_andTwoLsiAndIncludeAllLsi_andStreamAndIncludeStream() { + TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().gsiToInclude(new TreeSet<>()).lsiToInclude(new TreeSet<>()).newTableName(OUTPUT_TABLE).createAllGsi(true) + .createAllLsi(true).copyStreamSpecification(true).build(); + CreateTableRequest ctr = converter.apply(TwoLsiTwoGsiStream.description); + assertEquals(OUTPUT_TABLE, ctr.getTableName()); + assertEquals(DEFAULT_PT, ctr.getProvisionedThroughput()); + assertEquals(TwoLsiTwoGsiStream.attributeDefinitionList, TwoLsiTwoGsiStream.description.getAttributeDefinitions()); + assertEquals(TwoLsiTwoGsiStream.baseKeySchema, TwoLsiTwoGsiStream.description.getKeySchema()); + + //LSI + assertNotNull(ctr.getLocalSecondaryIndexes()); + assertFalse(ctr.getLocalSecondaryIndexes().isEmpty()); + assertEquals(2, ctr.getLocalSecondaryIndexes().size()); + assertEquals(TwoLsiTwoGsiStream.lsiKeySchema, ctr.getLocalSecondaryIndexes().get(0).getKeySchema()); + assertEquals(ProjectionType.KEYS_ONLY.toString(), ctr.getLocalSecondaryIndexes().get(0).getProjection().getProjectionType()); + assertEquals(LSI_NAME_ONE, ctr.getLocalSecondaryIndexes().get(0).getIndexName()); + assertEquals(TwoLsiTwoGsiStream.lsiKeySchema, ctr.getLocalSecondaryIndexes().get(1).getKeySchema()); + assertEquals(ProjectionType.ALL.toString(), ctr.getLocalSecondaryIndexes().get(1).getProjection().getProjectionType()); + assertEquals(LSI_NAME_TWO, ctr.getLocalSecondaryIndexes().get(1).getIndexName()); + + //GSI + assertNotNull(ctr.getGlobalSecondaryIndexes()); + assertFalse(ctr.getGlobalSecondaryIndexes().isEmpty()); + assertEquals(2, ctr.getGlobalSecondaryIndexes().size()); + assertEquals(TwoLsiTwoGsiStream.gsiKeySchema, ctr.getGlobalSecondaryIndexes().get(0).getKeySchema()); + assertEquals(ProjectionType.KEYS_ONLY.toString(), ctr.getGlobalSecondaryIndexes().get(0).getProjection().getProjectionType()); + assertEquals(GSI_NAME_ONE, ctr.getGlobalSecondaryIndexes().get(0).getIndexName()); + assertEquals(DEFAULT_PT, ctr.getGlobalSecondaryIndexes().get(0).getProvisionedThroughput()); + assertEquals(TwoLsiTwoGsiStream.gsiKeySchema, ctr.getGlobalSecondaryIndexes().get(1).getKeySchema()); + assertEquals(ProjectionType.ALL.toString(), ctr.getGlobalSecondaryIndexes().get(1).getProjection().getProjectionType()); + assertEquals(GSI_NAME_TWO, ctr.getGlobalSecondaryIndexes().get(1).getIndexName()); + assertEquals(DEFAULT_PT, ctr.getGlobalSecondaryIndexes().get(1).getProvisionedThroughput()); + + //STREAM SPECIFICATION + assertNotNull(ctr.getStreamSpecification()); + assertTrue(ctr.getStreamSpecification().getStreamEnabled()); + assertEquals(StreamViewType.NEW_AND_OLD_IMAGES.toString(), ctr.getStreamSpecification().getStreamViewType()); + } + + @Test + public void apply_whenTwoGsiAndDontIncludeAllGsi_andTwoLsiAndDontIncludeAllLsi_andStreamAndDontIncludeStream() { + TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().gsiToInclude(new TreeSet<>()).lsiToInclude(new TreeSet<>()).newTableName(OUTPUT_TABLE).createAllGsi(false) + .createAllLsi(false).copyStreamSpecification(false).build(); + CreateTableRequest ctr = converter.apply(TwoLsiTwoGsiStream.description); + assertEquals(OUTPUT_TABLE, ctr.getTableName()); + assertEquals(DEFAULT_PT, ctr.getProvisionedThroughput()); + assertEquals(TwoLsiTwoGsiStream.attributeDefinitionList, TwoLsiTwoGsiStream.description.getAttributeDefinitions()); + assertEquals(TwoLsiTwoGsiStream.baseKeySchema, TwoLsiTwoGsiStream.description.getKeySchema()); + + //LSI + assertNull(ctr.getLocalSecondaryIndexes()); + + //GSI + assertNull(ctr.getGlobalSecondaryIndexes()); + + //STREAM SPECIFICATION + assertNull(ctr.getStreamSpecification()); + } + + @Test + public void apply_whenTwoGsiAndDontIncludeAllGsiButIncludeOne_andTwoLsiAndDontIncludeAllLsiButIncludeOne_andStreamAndDontIncludeStream() { + SortedSet gsiToInclude = new TreeSet<>(); + gsiToInclude.add(GSI_NAME_TWO); + SortedSet lsiToInclude = new TreeSet<>(); + lsiToInclude.add(LSI_NAME_TWO); + TableDescriptionToCreateTableRequestConverter converter = + TableDescriptionToCreateTableRequestConverter.builder().gsiToInclude(gsiToInclude).lsiToInclude(lsiToInclude).newTableName(OUTPUT_TABLE).createAllGsi(false) + .createAllLsi(false).copyStreamSpecification(false).build(); + CreateTableRequest ctr = converter.apply(TwoLsiTwoGsiStream.description); + assertEquals(OUTPUT_TABLE, ctr.getTableName()); + assertEquals(DEFAULT_PT, ctr.getProvisionedThroughput()); + assertEquals(TwoLsiTwoGsiStream.attributeDefinitionList, TwoLsiTwoGsiStream.description.getAttributeDefinitions()); + assertEquals(TwoLsiTwoGsiStream.baseKeySchema, TwoLsiTwoGsiStream.description.getKeySchema()); + + //LSI + assertNotNull(ctr.getLocalSecondaryIndexes()); + assertFalse(ctr.getLocalSecondaryIndexes().isEmpty()); + assertEquals(TwoLsiTwoGsiStream.lsiKeySchema, ctr.getLocalSecondaryIndexes().get(0).getKeySchema()); + assertEquals(ProjectionType.ALL.toString(), ctr.getLocalSecondaryIndexes().get(0).getProjection().getProjectionType()); + assertEquals(LSI_NAME_TWO, ctr.getLocalSecondaryIndexes().get(0).getIndexName()); + + //GSI + assertNotNull(ctr.getGlobalSecondaryIndexes()); + assertFalse(ctr.getGlobalSecondaryIndexes().isEmpty()); + assertEquals(TwoLsiTwoGsiStream.gsiKeySchema, ctr.getGlobalSecondaryIndexes().get(0).getKeySchema()); + assertEquals(ProjectionType.ALL.toString(), ctr.getGlobalSecondaryIndexes().get(0).getProjection().getProjectionType()); + assertEquals(GSI_NAME_TWO, ctr.getGlobalSecondaryIndexes().get(0).getIndexName()); + assertEquals(DEFAULT_PT, ctr.getGlobalSecondaryIndexes().get(0).getProvisionedThroughput()); + + //STREAM SPECIFICATION + assertNull(ctr.getStreamSpecification()); + } +} diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumerTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumerTest.java similarity index 83% rename from src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumerTest.java rename to src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumerTest.java index e8e2bb5..f17ad71 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueConsumerTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/BlockingQueueConsumerTest.java @@ -12,9 +12,14 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.consumer; -import static org.junit.Assert.*; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.powermock.api.easymock.PowerMock.createMock; +import static org.powermock.api.easymock.PowerMock.mockStatic; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; @@ -26,14 +31,10 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import static org.powermock.api.easymock.PowerMock.*; -import static org.easymock.EasyMock.expect; - -import com.amazonaws.dynamodb.bootstrap.BlockingQueueConsumer; +import com.amazonaws.dynamodb.bootstrap.items.DynamoDBEntryWithSize; /** * Unit Tests for LogStashExecutor - * */ @RunWith(PowerMockRunner.class) @PrepareForTest(BlockingQueueConsumer.class) @@ -52,8 +53,7 @@ public void testInitializeAndShutdown() { mockStatic(Executors.class); ExecutorService mockThreadPool = createMock(ExecutorService.class); - expect(Executors.newFixedThreadPool(totalThreads)).andReturn( - mockThreadPool); + expect(Executors.newFixedThreadPool(totalThreads)).andReturn(mockThreadPool); BlockingQueue queue = logExec.getQueue(); diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumerTest.java similarity index 82% rename from src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerTest.java rename to src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumerTest.java index 9ae7113..c438b94 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumerTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/consumer/DynamoDBConsumerTest.java @@ -12,23 +12,24 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.consumer; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verifyAll; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; -import static org.powermock.api.easymock.PowerMock.*; - import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PowerMockIgnore; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.BatchWriteItemRequest; @@ -36,7 +37,6 @@ /** * Unit tests for DynamoDBConsumerWorker - * */ @RunWith(PowerMockRunner.class) @PrepareForTest(DynamoDBConsumer.class) @@ -56,8 +56,7 @@ public void splitResultIntoBatchesTest() { List> items = new LinkedList>(); for (int i = 0; i < numItems; i++) { Map sampleScanResult = new HashMap(); - sampleScanResult.put("key", new AttributeValue("attribute value " - + i)); + sampleScanResult.put("key", new AttributeValue("attribute value " + i)); items.add(sampleScanResult); } scanResult.setItems(items); @@ -65,10 +64,8 @@ public void splitResultIntoBatchesTest() { SegmentedScanResult result = new SegmentedScanResult(scanResult, 0); replayAll(); - List batches = DynamoDBConsumer - .splitResultIntoBatches(result.getScanResult(), tableName); - assertEquals(Math.ceil(numItems / BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM), - batches.size(), 0.0); + List batches = DynamoDBConsumer.splitResultIntoBatches(result.getScanResult(), tableName); + assertEquals(Math.ceil(numItems / BootstrapConstants.MAX_BATCH_SIZE_WRITE_ITEM), batches.size(), 0.0); verifyAll(); } diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java new file mode 100644 index 0000000..354618d --- /dev/null +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToAnother.java @@ -0,0 +1,43 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap.example; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +import com.amazonaws.dynamodb.bootstrap.consumer.DynamoDBConsumer; +import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; +import com.amazonaws.dynamodb.bootstrap.worker.DynamoDBBootstrapWorker; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; + +class TransferDataFromOneTableToAnother { + public static void main(String[] args) { + final AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); + try { + // 100.0 read operations per second. 4 threads to scan the table. + final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(client, 100.0, "mySourceTable", 4); + // 50.0 write operations per second. 8 threads to scan the table. + final DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable", 50.0, Executors.newFixedThreadPool(8)); + worker.pipe(consumer); + } catch (NullReadCapacityException e) { + System.err.println("The DynamoDB source table returned a null read capacity."); + System.exit(1); + } catch (ExecutionException | InterruptedException e) { + System.err.println("Encountered exception when executing transfer: " + e.getMessage()); + System.exit(1); + } + } +} diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java new file mode 100644 index 0000000..c5bb967 --- /dev/null +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/example/TransferDataFromOneTableToBlockingQueue.java @@ -0,0 +1,41 @@ +/* + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.dynamodb.bootstrap.example; + +import java.util.concurrent.ExecutionException; + +import com.amazonaws.dynamodb.bootstrap.consumer.BlockingQueueConsumer; +import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException; +import com.amazonaws.dynamodb.bootstrap.worker.DynamoDBBootstrapWorker; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; + +class TransferDataFromOneTableToBlockingQueue { + public static void main(String[] args) { + final AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().withRegion(com.amazonaws.regions.Regions.US_WEST_1).build(); + try { + // 100.0 read operations per second. 4 threads to scan the table. + final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(client, 100.0, "mySourceTable", 4); + final BlockingQueueConsumer consumer = new BlockingQueueConsumer(8); + worker.pipe(consumer); + } catch (NullReadCapacityException e) { + System.err.println("The DynamoDB source table returned a null read capacity."); + System.exit(1); + } catch (ExecutionException | InterruptedException e) { + System.err.println("Encountered exception when executing transfer: " + e.getMessage()); + System.exit(1); + } + } +} diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixInTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixInTest.java similarity index 87% rename from src/test/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixInTest.java rename to src/test/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixInTest.java index 86230fb..a7da451 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixInTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/items/AttributeValueMixInTest.java @@ -12,9 +12,9 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.items; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; import java.util.HashMap; import java.util.LinkedList; @@ -30,7 +30,6 @@ /** * Unit test for AttributeValueMixIn - * */ public class AttributeValueMixInTest { @@ -57,17 +56,14 @@ public void testReturnsCapitalSWithMixin() throws JsonProcessingException { ObjectMapper mapperWith = new ObjectMapper(); mapperWith.setSerializationInclusion(Include.NON_NULL); - mapperWith.addMixInAnnotations(AttributeValue.class, - AttributeValueMixIn.class); + mapperWith.addMixIn(AttributeValue.class, AttributeValueMixIn.class); - String withMixIn = mapperWith.writeValueAsString(sampleScanResult() - .get(0)); + String withMixIn = mapperWith.writeValueAsString(sampleScanResult().get(0)); ObjectMapper mapperWithout = new ObjectMapper(); mapperWithout.setSerializationInclusion(Include.NON_NULL); - String withoutMixIn = mapperWithout - .writeValueAsString(sampleScanResult().get(0)); + String withoutMixIn = mapperWithout.writeValueAsString(sampleScanResult().get(0)); assertTrue(withMixIn.contains(capitalS)); assertTrue(withoutMixIn.contains(lowercaseS)); diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorkerTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorkerTest.java similarity index 80% rename from src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorkerTest.java rename to src/test/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorkerTest.java index ee2d2a0..95ce49d 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorkerTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/worker/BlockingQueueWorkerTest.java @@ -12,9 +12,14 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; -import static org.junit.Assert.*; +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.powermock.api.easymock.PowerMock.createMock; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verifyAll; import java.util.HashMap; import java.util.LinkedList; @@ -29,40 +34,32 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import static org.powermock.api.easymock.PowerMock.*; -import static org.easymock.EasyMock.expect; - -import com.amazonaws.dynamodb.bootstrap.BlockingQueueWorker; import com.amazonaws.dynamodb.bootstrap.SegmentedScanResult; +import com.amazonaws.dynamodb.bootstrap.items.DynamoDBEntryWithSize; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.ScanResult; /** * Unit Tests for LogStashQueueWorker - * */ @RunWith(PowerMockRunner.class) @PrepareForTest(BlockingQueueWorker.class) @PowerMockIgnore("javax.management.*") public class BlockingQueueWorkerTest { - + /** * Test the initialization of a BlockingQueueWorker and make sure it places the items in the queue when called. */ @Test public void testInitializationAndCall() { ScanResult mockResult = createMock(ScanResult.class); - SegmentedScanResult segmentedScanResult = new SegmentedScanResult( - mockResult, 0); - BlockingQueue queue = new ArrayBlockingQueue( - 20); - BlockingQueueWorker callable = new BlockingQueueWorker(queue, - segmentedScanResult); + SegmentedScanResult segmentedScanResult = new SegmentedScanResult(mockResult, 0); + BlockingQueue queue = new ArrayBlockingQueue(20); + BlockingQueueWorker callable = new BlockingQueueWorker(queue, segmentedScanResult); List> items = new LinkedList>(); Map sampleScanResult = new HashMap(); - sampleScanResult.put("sample key", new AttributeValue( - "sample attribute value")); + sampleScanResult.put("sample key", new AttributeValue("sample attribute value")); items.add(sampleScanResult); expect(mockResult.getItems()).andReturn(items); diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorkerTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorkerTest.java similarity index 88% rename from src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorkerTest.java rename to src/test/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorkerTest.java index 3e91fe9..453431b 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorkerTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/worker/DynamoDBBootstrapWorkerTest.java @@ -12,11 +12,13 @@ * express or implied. See the License for the specific language governing * permissions and limitations under the License. */ -package com.amazonaws.dynamodb.bootstrap; +package com.amazonaws.dynamodb.bootstrap.worker; import static org.easymock.EasyMock.expect; import static org.junit.Assert.assertTrue; import static org.powermock.api.easymock.PowerMock.createMock; +import static org.powermock.api.easymock.PowerMock.replayAll; +import static org.powermock.api.easymock.PowerMock.verifyAll; import java.util.concurrent.ThreadPoolExecutor; @@ -26,14 +28,11 @@ import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; -import static org.powermock.api.easymock.PowerMock.*; - import com.amazonaws.dynamodb.bootstrap.exception.SectionOutOfRangeException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; /** * Unit Tests for DynamoDBBootstrapWorker - * */ @RunWith(PowerMockRunner.class) @PrepareForTest(DynamoDBBootstrapWorker.class) @@ -62,12 +61,11 @@ public void testInitialization() throws Exception { replayAll(); - new DynamoDBBootstrapWorker(mockClient, rateLimit, tableName, - mockThreadPool, 0, 1, 10, false); + new DynamoDBBootstrapWorker(mockClient, rateLimit, tableName, mockThreadPool, 0, 1, 10, false); verifyAll(); } - + /** * Test the initialization of a DynamoDBBootstrapWorker with an invalid section. */ @@ -78,10 +76,9 @@ public void testInitializationInvalidSection() throws Exception { replayAll(); boolean exceptionThrown = false; - try{ - new DynamoDBBootstrapWorker(mockClient, rateLimit, tableName, - mockThreadPool, 1, 1, 10, false); - }catch (SectionOutOfRangeException e){ + try { + new DynamoDBBootstrapWorker(mockClient, rateLimit, tableName, mockThreadPool, 1, 1, 10, false); + } catch (SectionOutOfRangeException e) { exceptionThrown = true; } @@ -99,8 +96,7 @@ public void testShutdownWithoutWaiting() throws Exception { expect(mockThreadPool.shutdownNow()).andReturn(null); replayAll(); - DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker( - mockClient, rateLimit, tableName, mockThreadPool, 0, 1, 10, false); + DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(mockClient, rateLimit, tableName, mockThreadPool, 0, 1, 10, false); worker.shutdown(false); verifyAll();