Skip to content
This repository was archived by the owner on Jun 18, 2020. It is now read-only.

Make use of bounded queues everywhere #8

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws</groupId>
<version>1.0.1</version>
<version>1.0.2-SNAPSHOT</version>
<artifactId>dynamodb-import-export-tool</artifactId>
<packaging>jar</packaging>
<name>DynamoDB Import Export Tool</name>
<url>https://github.com/awslabs/dynamodb-import-export-tool</url>
<description>Exports DynamoDB items via parallel scan into a blocking queue, then consumes the queue and import DynamoDB items into a replica table using asynchronous writes.</description>
<scm>
<url>https://github.com/awslabs/dynamodb-import-export-tool.git</url>
<tag>1.0.2-SNAPSHOT</tag>
</scm>
<properties>
<aws.java.sdk.version>1.10.10</aws.java.sdk.version>
Original file line number Diff line number Diff line change
@@ -14,8 +14,10 @@
*/
package com.amazonaws.dynamodb.bootstrap;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

@@ -30,20 +32,33 @@
*/
public abstract class AbstractLogConsumer {

public ExecutorCompletionService<Void> exec;
protected ExecutorService threadPool;
// only keep a reference to the thread pool because we need to be able to shut it down
private ExecutorService threadPool;
protected ExecutorCompletionService<Void> exec;

/**
* Logger for the DynamoDBBootstrapWorker.
*/
private static final Logger LOGGER = LogManager
.getLogger(AbstractLogConsumer.class);



protected AbstractLogConsumer(int numThreads) {
this.threadPool = Executors.newFixedThreadPool(numThreads);
this.exec = new ExecutorCompletionService<Void>(threadPool, new ArrayBlockingQueue<Future<Void>>(numThreads));
}

protected AbstractLogConsumer(ExecutorCompletionService<Void> exec, ExecutorService threadPool) {
this.threadPool = threadPool;
this.exec = exec;
}

/**
* Writes the result of a scan to another endpoint asynchronously. Will call
* getWorker to determine what job to submit with the result.
*
* @param <result>
* @param result
* the SegmentedScanResult to asynchronously write to another
* endpoint.
*/
@@ -52,7 +67,7 @@ public abstract class AbstractLogConsumer {
/**
* Shuts the thread pool down.
*
* @param <awaitTermination>
* @param awaitTermination
* If true, this method waits for the threads in the pool to
* finish. If false, this thread pool shuts down without
* finishing their current tasks.
Original file line number Diff line number Diff line change
@@ -33,13 +33,8 @@ public class BlockingQueueConsumer extends AbstractLogConsumer {
private BlockingQueue<DynamoDBEntryWithSize> queue;

public BlockingQueueConsumer(int numThreads) {
super(Math.max(numThreads, Runtime.getRuntime().availableProcessors()));
this.queue = new ArrayBlockingQueue<DynamoDBEntryWithSize>(20);
int numProcessors = Runtime.getRuntime().availableProcessors();
if (numProcessors > numThreads) {
numThreads = numProcessors;
}
this.threadPool = Executors.newFixedThreadPool(numThreads);
this.exec = new ExecutorCompletionService<Void>(threadPool);
}

@Override
Original file line number Diff line number Diff line change
@@ -91,6 +91,7 @@ public static void main(String[] args) {

TableDescription readTableDescription = sourceClient.describeTable(
sourceTable).getTable();

TableDescription writeTableDescription = destinationClient
.describeTable(destinationTable).getTable();
int numSegments = 10;
@@ -107,11 +108,27 @@ public static void main(String[] args) {
final double writeThroughput = calculateThroughput(
writeTableDescription, writeThroughputRatio, false);

final double averageItemSize;
if (readTableDescription.getItemCount().equals(Long.valueOf(0L))) {
averageItemSize = 0.0;
} else {
averageItemSize = readTableDescription.getTableSizeBytes().doubleValue() / readTableDescription.getItemCount().doubleValue();
}
final double averageWcuPerItem = Math.ceil(averageItemSize / 1024.0);
final double averageWcuPerBatchWriteItem = averageWcuPerItem * 25;
final int parallelBatchWriteItems = (int) Math.min(1L, Math.round(Math.ceil(writeThroughput / averageWcuPerBatchWriteItem)));
if (maxWriteThreads > parallelBatchWriteItems) {
LOGGER.warn("Expected WCU per BatchWriteItem call is " + averageWcuPerBatchWriteItem
+ " so this configuration could support up to " + parallelBatchWriteItems
+ " parallel BatchWriteItem calls. However, maxWriteThreads(" + maxWriteThreads
+ ") was greater than this expectation.");
}

try {
ExecutorService sourceExec = getSourceThreadPool(numSegments);
ExecutorService destinationExec = getDestinationThreadPool(maxWriteThreads);
DynamoDBConsumer consumer = new DynamoDBConsumer(destinationClient,
destinationTable, writeThroughput, destinationExec);
destinationTable, writeThroughput, destinationExec, maxWriteThreads);

final DynamoDBBootstrapWorker worker = new DynamoDBBootstrapWorker(
sourceClient, readThroughput, sourceTable, sourceExec,
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -46,12 +47,11 @@ public class DynamoDBConsumer extends AbstractLogConsumer {
* Class to consume logs and write them to a DynamoDB table.
*/
public DynamoDBConsumer(AmazonDynamoDBClient client, String tableName,
double rateLimit, ExecutorService exec) {
double rateLimit, ExecutorService exec, int parallelBatchWriteItems) {
super(new ExecutorCompletionService<Void>(exec, new ArrayBlockingQueue<Future<Void>>(parallelBatchWriteItems)), exec);
this.client = client;
this.tableName = tableName;
this.rateLimiter = RateLimiter.create(rateLimit);
super.threadPool = exec;
super.exec = new ExecutorCompletionService<Void>(threadPool);
}

/**