Skip to content
This repository was archived by the owner on Jun 18, 2020. It is now read-only.

Update SDK. Allow creating destination if it does not exist. #7

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
/.idea
/target
/*.iml
.*~
*~
224 changes: 140 additions & 84 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,113 +1,169 @@
# DynamoDB Import Export Tool
The DynamoDB Import Export Tool is designed to perform parallel scans on the source table, store scan results in a queue, then consume the queue by writing the items asynchronously to a destination table.
The DynamoDB Import Export Tool is designed to perform parallel scans on the source table,
store scan results in a queue, then consume the queue by writing the items asynchronously to a destination table.

## Requirements ##
* Maven
* JRE 1.7+
* Pre-existing source and destination DynamoDB tables
* Pre-existing source DynamoDB tables. The destination table is optional in the CLI; you can choose to create the
destination table if it does not exist.

## Running as an executable

1. Build the library:

```
mvn install
1. Build the library with `mvn install`. This produces the target jar in the target/ directory.
The CLI's usage follows with required parameters marked by asterisks.

```bash
--consistentScan
Use this flag to use strongly consistent scan. If the flag is not used
it will default to eventually consistent scan
Default: false
--copyStreamSpecificationWhenCreating
Use the source table stream specification for the destination table
during its creation.
Default: false
--createAllGsi
Create all GSI in destination table
Default: false
--createAllLsi
Create all LSI in destination table
Default: false
--createDestination
Create destination table if it does not exist
Default: false
--destinationEndpoint
Endpoint of the destination table
* --destinationSigningRegion
Signing region for the destination endpoint
* --destinationTable
Name of the destination table
--help
Display usage information
--includeGsi
Include the following GSI in the destination table
--includeLsi
Include the following LSI in the destination table
--maxWriteThreads
Number of max threads to write to destination table
Default: 1024
* --readThroughputRatio
Percentage of total read throughput to scan the source table
Default: 0.0
--section
Section number to scan when running multiple programs concurrently [0,
1... totalSections-1]
Default: 0
--sourceEndpoint
Endpoint of the source table
* --sourceSigningRegion
Signing region for the source endpoint
* --sourceTable
Name of the source table
--totalSections
Total number of sections to divide the scan into
Default: 1
* --writeThroughputRatio
Percentage of total write throughput to write the destination table
Default: 0.0
```

2. This produces the target jar in the target/ directory, to start the replication process:

java -jar dynamodb-import-export-tool.jar

--destinationEndpoint <destination_endpoint> // the DynamoDB endpoint where the destination table is located.

--destinationTable <destination_table> // the destination table to write to.

--sourceEndpoint <source_endpoint> // the endpoint where the source table is located.

--sourceTable <source_table>// the source table to read from.

--readThroughputRatio <ratio_in_decimal> // the ratio of read throughput to consume from the source table.

--writeThroughputRatio <ratio_in_decimal> // the ratio of write throughput to consume from the destination table.

--maxWriteThreads <numWriteThreads> // (Optional, default=128 * Available_Processors) Maximum number of write threads to create.

--totalSections <numSections> // (Optional, default=1) Total number of sections to split the bootstrap into. Each application will only scan and write one section.

--section <sectionSequence> // (Optional, default=0) section to read and write. Only will scan this one section of all sections, [0...totalSections-1].

--consistentScan <boolean> // (Optional, default=false) indicates whether consistent scan should be used when reading from the source table.
2. An example command you can use on one EC2 host to copy from one table `foo` in `us-east-1` to a new table
called `bar` in `us-east-2` follows.

```bash
java -jar target/dynamodb-import-export-tool-1.1.0.jar \
--sourceRegion us-east-1 \
--sourceTable foo \
--destinationRegion us-east-2 \
--destinationTable bar \
--readThroughputRatio 1 \
--writeThroughputRatio 1
```

> **NOTE**: To split the replication process across multiple machines, simply use the totalSections & section command line arguments, where each machine will run one section out of [0 ... totalSections-1].
> **NOTE**: To split the replication process across multiple machines, simply use the totalSections & section
command line arguments, where each machine will run one section out of [0 ... totalSections-1].

## Using the API
Find some examples of how to use the Import-Export tool's API below.
The first demonstrates how to use the API to copy data from one DynamoDB table to another.
The second demonstrates how to enqueue the data in a DynamoDB table in a
`BlockingQueueConsumer` in memory.

### 1. Transfer Data from One DynamoDB Table to Another DynamoDB Table

The below example will read from "mySourceTable" at 100 reads per second, using 4 threads. And it will write to "myDestinationTable" at 50 writes per second, using 8 threads.
Both tables are located at "dynamodb.us-west-1.amazonaws.com". (to transfer to a different region, create 2 AmazonDynamoDBClients
The below example will read from "mySourceTable" at 100 reads per second, using four threads.
And it will write to "myDestinationTable" at 50 writes per second, using eight threads.
Both tables are located at "dynamodb.us-west-1.amazonaws.com".
To transfer to a different region, create two AmazonDynamoDBClients
with different endpoints to pass into the DynamoDBBootstrapWorker and the DynamoDBConsumer.

```java
AmazonDynamoDBClient client = new AmazonDynamoDBClient(new ProfileCredentialsProvider());
client.setEndpoint("dynamodb.us-west-1.amazonaws.com");

DynamoDBBootstrapWorker worker = null;

try {
// 100.0 read operations per second. 4 threads to scan the table.
worker = new DynamoDBBootstrapWorker(client,
100.0, "mySourceTable", 4);
} catch (NullReadCapacityException e) {
LOGGER.error("The DynamoDB source table returned a null read capacity.", e);
System.exit(1);
}

// 50.0 write operations per second. 8 threads to scan the table.
DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable", 50.0, Executors.newFixedThreadPool(8));

try {
worker.pipe(consumer);
} catch (ExecutionException e) {
LOGGER.error("Encountered exception when executing transfer.", e);
System.exit(1);
} catch (InterruptedException e){
LOGGER.error("Interrupted when executing transfer.", e);
System.exit(1);
import DynamoDBBootstrapWorker;
import com.amazonaws.dynamodb.bootstrap.consumer.DynamoDBConsumer;
import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;

class TransferDataFromOneTableToAnother {
public static void main(String[] args) {
final AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard()
.withRegion(com.amazonaws.regions.Regions.US_WEST_1).build();
try {
// 100.0 read operations per second. 4 threads to scan the table.
final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(client,
100.0, "mySourceTable", 4);
// 50.0 write operations per second. 8 threads to scan the table.
final DynamoDBConsumer consumer = new DynamoDBConsumer(client, "myDestinationTable",
50.0, Executors.newFixedThreadPool(8));
worker.pipe(consumer);
} catch (NullReadCapacityException e) {
System.err.println("The DynamoDB source table returned a null read capacity.");
System.exit(1);
} catch (ExecutionException | InterruptedException e) {
System.err.println("Encountered exception when executing transfer: " + e.getMessage());
System.exit(1);
}
}
}
```


### 2. Transfer Data From one DynamoDB Table to a Blocking Queue.

The below example will read from a DynamoDB table and export to an array blocking queue. This is useful for when another application would like to consume
the DynamoDB entries but does not have a setup application for it. They can just retrieve the queue (consumer.getQueue()) and then continually pop() from it
The below example will read from a DynamoDB table and export to an array blocking queue.
This is useful for when another application would like to consume
the DynamoDB entries but does not have a setup application for it.
They can just retrieve the queue (consumer.getQueue()) and then continually pop() from it
to then process the new entries.

```java
AmazonDynamoDBClient client = new AmazonDynamoDBClient(new ProfileCredentialsProvider());
client.setEndpoint("dynamodb.us-west-1.amazonaws.com");

DynamoDBBootstrapWorker worker = null;

try {
// 100.0 read operations per second. 4 threads to scan the table.
worker = new DynamoDBBootstrapWorker(client,
100.0, "mySourceTable", 4);
} catch (NullReadCapacityException e) {
LOGGER.error("The DynamoDB source table returned a null read capacity.", e);
System.exit(1);
}

BlockingQueueConsumer consumer = new BlockingQueueConsumer(8);

try {
worker.pipe(consumer);
} catch (ExecutionException e) {
LOGGER.error("Encountered exception when executing transfer.", e);
System.exit(1);
} catch (InterruptedException e){
LOGGER.error("Interrupted when executing transfer.", e);
System.exit(1);
import com.amazonaws.dynamodb.bootstrap.consumer.BlockingQueueConsumer;
import DynamoDBBootstrapWorker;
import com.amazonaws.dynamodb.bootstrap.exception.NullReadCapacityException;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;

import java.util.concurrent.ExecutionException;

class TransferDataFromOneTableToBlockingQueue {
public static void main(String[] args) {
final AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard()
.withRegion(com.amazonaws.regions.Regions.US_WEST_1).build();
try {
// 100.0 read operations per second. 4 threads to scan the table.
final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(client, 100.0,
"mySourceTable", 4);
final BlockingQueueConsumer consumer = new BlockingQueueConsumer(8);
worker.pipe(consumer);
} catch (NullReadCapacityException e) {
System.err.println("The DynamoDB source table returned a null read capacity.");
System.exit(1);
} catch (ExecutionException | InterruptedException e) {
System.err.println("Encountered exception when executing transfer: " + e.getMessage());
System.exit(1);
}
}
}
```
29 changes: 19 additions & 10 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws</groupId>
<version>1.0.1</version>
<version>1.1.0</version>
<artifactId>dynamodb-import-export-tool</artifactId>
<packaging>jar</packaging>
<name>DynamoDB Import Export Tool</name>
Expand All @@ -11,14 +11,17 @@
<url>https://github.com/awslabs/dynamodb-import-export-tool.git</url>
</scm>
<properties>
<aws.java.sdk.version>1.10.10</aws.java.sdk.version>
<powermock.version>1.6.2</powermock.version>
<jcommander.version>1.48</jcommander.version>
<guava.version>15.0</guava.version>
<jdk.version>1.8</jdk.version>
<aws.java.sdk.version>1.11.123</aws.java.sdk.version>
<powermock.version>1.6.6</powermock.version>
<jcommander.version>1.69</jcommander.version>
<guava.version>21.0</guava.version>
<log4j.core.version>1.2.17</log4j.core.version>
<easymock.version>3.2</easymock.version>
<easymock.version>3.4</easymock.version>
<commons.logging.version>1.2</commons.logging.version>
<maven.shade.version>2.4.1</maven.shade.version>
<maven.shade.version>3.0.0</maven.shade.version>
<maven.compiler.version>3.0</maven.compiler.version>
<maven.gpg.version>1.6</maven.gpg.version>
<gpg.skip>true</gpg.skip>
</properties>
<developers>
Expand Down Expand Up @@ -84,6 +87,11 @@
<artifactId>log4j</artifactId>
<version>${log4j.core.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
Expand All @@ -109,14 +117,15 @@
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>${jdk.version}</source>
<target>${jdk.version}</target>
</configuration>
<version>3.0</version>
<version>${maven.compiler.version}</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-gpg-plugin</artifactId>
<version>${maven.gpg.version}</version>
<executions>
<execution>
<id>sign-artifacts</id>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.log4j.Logger;

import com.amazonaws.dynamodb.bootstrap.constants.BootstrapConstants;
import com.amazonaws.dynamodb.bootstrap.consumer.AbstractLogConsumer;

/**
* Abstract class to send inputs from a source to a consumer.
Expand All @@ -31,25 +32,22 @@ public abstract class AbstractLogProvider {
/**
* Logger for the DynamoDBBootstrapWorker.
*/
private static final Logger LOGGER = LogManager
.getLogger(AbstractLogProvider.class);
private static final Logger LOGGER = LogManager.getLogger(AbstractLogProvider.class);

protected ExecutorService threadPool;

/**
* Begins to read log results and transfer them to the consumer who will
* write the results.
*/
public abstract void pipe(final AbstractLogConsumer consumer)
throws ExecutionException, InterruptedException;
public abstract void pipe(final AbstractLogConsumer consumer) throws ExecutionException, InterruptedException;

/**
* Shuts the thread pool down.
*
* @param <awaitTermination>
* If true, this method waits for the threads in the pool to
* finish. If false, this thread pool shuts down without
* finishing their current tasks.
*
* @param <awaitTermination> If true, this method waits for the threads in the pool to
* finish. If false, this thread pool shuts down without
* finishing their current tasks.
*/
public void shutdown(boolean awaitTermination) {
if (awaitTermination) {
Expand All @@ -61,8 +59,7 @@ public void shutdown(boolean awaitTermination) {
}
} catch (InterruptedException e) {
interrupted = true;
LOGGER.warn("Threadpool was interrupted when trying to shutdown: "
+ e.getMessage());
LOGGER.warn("Threadpool was interrupted when trying to shutdown: " + e.getMessage());
} finally {
if (interrupted)
Thread.currentThread().interrupt();
Expand Down
Loading