diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e2a3069 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +*~ diff --git a/pom.xml b/pom.xml index a0360bb..367924c 100644 --- a/pom.xml +++ b/pom.xml @@ -1,7 +1,7 @@ 4.0.0 com.amazonaws - 1.0.1 + 1.0.2 dynamodb-import-export-tool jar DynamoDB Import Export Tool diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogConsumer.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogConsumer.java index 960a59d..dc2654a 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogConsumer.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogConsumer.java @@ -42,7 +42,7 @@ public abstract class AbstractLogConsumer { /** * Writes the result of a scan to another endpoint asynchronously. Will call * getWorker to determine what job to submit with the result. - * + * * @param * the SegmentedScanResult to asynchronously write to another * endpoint. @@ -51,7 +51,7 @@ public abstract class AbstractLogConsumer { /** * Shuts the thread pool down. - * + * * @param * If true, this method waits for the threads in the pool to * finish. If false, this thread pool shuts down without @@ -77,4 +77,4 @@ public void shutdown(boolean awaitTermination) { threadPool.shutdownNow(); } } -} \ No newline at end of file +} diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java index 40c4883..75bdd6f 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/AbstractLogProvider.java @@ -45,7 +45,7 @@ public abstract void pipe(final AbstractLogConsumer consumer) /** * Shuts the thread pool down. - * + * * @param * If true, this method waits for the threads in the pool to * finish. If false, this thread pool shuts down without diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixIn.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixIn.java index 31c2c6e..0547787 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixIn.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/AttributeValueMixIn.java @@ -24,7 +24,7 @@ /** * Mixin for attribute values to stay all capital when mapping them as strings. - * + * */ public abstract class AttributeValueMixIn { @JsonProperty("S") public abstract String getS(); @@ -47,4 +47,4 @@ public abstract class AttributeValueMixIn { @JsonProperty("M") public abstract void setM(Map val); @JsonProperty("L") public abstract List getL(); @JsonProperty("L") public abstract void setL(List val); -} \ No newline at end of file +} diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorker.java index b819042..0535eba 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/BlockingQueueWorker.java @@ -75,4 +75,4 @@ public Void call() { } return null; } -} \ No newline at end of file +} diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java index 1b41a91..ce34811 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineArgs.java @@ -101,7 +101,7 @@ public int getTotalSections() { public int getSection() { return section; } - + public static final String CONSISTENT_SCAN = "--consistentScan"; @Parameter(names = CONSISTENT_SCAN, description = "Use this flag to use strongly consistent scan. If the flag is not used it will default to eventually consistent scan") private boolean consistentScan = false; diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java index 67639fc..953f19f 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/CommandLineInterface.java @@ -48,7 +48,7 @@ public class CommandLineInterface { /** * Main class to begin transferring data from one DynamoDB table to another * DynamoDB table. - * + * * @param args */ public static void main(String[] args) { @@ -107,6 +107,9 @@ public static void main(String[] args) { final double writeThroughput = calculateThroughput( writeTableDescription, writeThroughputRatio, false); + LOGGER.info("Calculated read throughput: " + readThroughput); + LOGGER.info("Calculated write throughput: " + writeThroughput); + try { ExecutorService sourceExec = getSourceThreadPool(numSegments); ExecutorService destinationExec = getDestinationThreadPool(maxWriteThreads); diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java index 10278ef..b2864a9 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBBootstrapWorker.java @@ -43,7 +43,7 @@ public class DynamoDBBootstrapWorker extends AbstractLogProvider { /** * Creates the DynamoDBBootstrapWorker, calculates the number of segments a * table should have, and creates a thread pool to prepare to scan. - * + * * @throws Exception */ public DynamoDBBootstrapWorker(AmazonDynamoDBClient client, @@ -71,7 +71,7 @@ public DynamoDBBootstrapWorker(AmazonDynamoDBClient client, * Creates the DynamoDBBootstrapWorker, calculates the number of segments a * table should have, and creates a thread pool to prepare to scan using an * eventually consistent scan. - * + * * @throws Exception */ public DynamoDBBootstrapWorker(AmazonDynamoDBClient client, @@ -128,7 +128,7 @@ public void pipe(final AbstractLogConsumer consumer) * table, which should need many more segments in order to scan the table * fast enough in parallel so that one worker does not finish long before * other workers. - * + * * @throws NullReadCapacityException * if the table returns a null readCapacity units. */ diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java index a5bfa6c..09c0991 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBConsumer.java @@ -111,4 +111,4 @@ public static List splitResultIntoBatches( } return batches; } -} \ No newline at end of file +} diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBEntryWithSize.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBEntryWithSize.java index 98ee9ea..2137304 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBEntryWithSize.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBEntryWithSize.java @@ -38,4 +38,4 @@ public int getSize() { public Map getEntry() { return entry; } -} \ No newline at end of file +} diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java index 7ebe248..8695289 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScan.java @@ -39,10 +39,10 @@ public DynamoDBTableScan(double rateLimit, AmazonDynamoDBClient client) { /** * This function copies a scan request for the number of segments and then * adds those workers to the executor service to begin scanning. - * + * * @param totalSections * @param section - * + * * @return the parallel scan executor to grab results * when a segment is finished. */ @@ -50,8 +50,6 @@ public ParallelScanExecutor getParallelScanCompletionService( ScanRequest initialRequest, int numSegments, Executor executor, int section, int totalSections) { final int segments = Math.max(1, numSegments); - final ParallelScanExecutor completion = new ParallelScanExecutor( - executor, segments); int sectionSize = segments / totalSections; int start = sectionSize * section; @@ -59,6 +57,9 @@ public ParallelScanExecutor getParallelScanCompletionService( if (section + 1 == totalSections) { end = segments; } + final int numberToComplete = end - start; + final ParallelScanExecutor completion = new ParallelScanExecutor( + executor, segments, numberToComplete); for (int segment = start; segment < end; segment++) { ScanRequest scanSegment = copyScanRequest(initialRequest) diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/ItemSizeCalculator.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/ItemSizeCalculator.java index c965548..7e0473c 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/ItemSizeCalculator.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/ItemSizeCalculator.java @@ -25,7 +25,7 @@ /** * Class used to calculate the size of a DynamoDB item in bytes. - * + * */ public class ItemSizeCalculator { @@ -47,7 +47,7 @@ public static int calculateItemSizeInBytes(Map item) { } return size; } - + public static int calculateScanResultSizeInBytes(ScanResult result) { final Iterator> it = result.getItems().iterator(); int totalBytes = 0; diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java index 3424221..65e702d 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/ParallelScanExecutor.java @@ -24,18 +24,20 @@ * This class executes multiple scan requests on one segment of a table in * series, as a runnable. Instances meant to be used as tasks of the worker * thread pool for parallel scans. - * + * */ public class ParallelScanExecutor { private final BitSet finished; private final ScanSegmentWorker[] workers; private final ExecutorCompletionService exec; + private final int numberToComplete; - public ParallelScanExecutor(Executor executor, int segments) { + public ParallelScanExecutor(Executor executor, int segments, int numberToComplete) { this.exec = new ExecutorCompletionService(executor); this.finished = new BitSet(segments); this.finished.clear(); this.workers = new ScanSegmentWorker[segments]; + this.numberToComplete = numberToComplete; } /** @@ -56,14 +58,14 @@ public void finishSegment(int segment) { */ public boolean finished() { synchronized (finished) { - return finished.cardinality() == workers.length; + return finished.cardinality() == numberToComplete; } } /** * This method gets a segmentedScanResult and submits the next scan request * for that segment, if there is one. - * + * * @return the next available ScanResult * @throws ExecutionException * if one of the segment pages threw while executing diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java index c496e75..e8e366a 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/ScanSegmentWorker.java @@ -27,7 +27,7 @@ * This class executes multiple scan requests on one segment of a table in * series, as a runnable. Instances meant to be used as tasks of the worker * thread pool for parallel scans. - * + * */ public class ScanSegmentWorker implements Callable { private final ScanRequest request; @@ -43,7 +43,6 @@ public class ScanSegmentWorker implements Callable { this.client = client; this.rateLimiter = rateLimiter; this.hasNext = true; - this.exponentialBackoffTime = BootstrapConstants.INITIAL_RETRY_TIME_MILLISECONDS; lastConsumedCapacity = 256; } @@ -62,7 +61,6 @@ public SegmentedScanResult call() { lastConsumedCapacity = result.getConsumedCapacity() .getCapacityUnits().intValue(); } else if (result.getScannedCount() != null && result.getCount() != null) { - final boolean isConsistent = request.getConsistentRead(); int itemSize = isConsistent ? BootstrapConstants.STRONGLY_CONSISTENT_READ_ITEM_SIZE : BootstrapConstants.EVENTUALLY_CONSISTENT_READ_ITEM_SIZE; @@ -91,6 +89,7 @@ public SegmentedScanResult call() { public ScanResult runWithBackoff() { ScanResult result = null; boolean interrupted = false; + exponentialBackoffTime = BootstrapConstants.INITIAL_RETRY_TIME_MILLISECONDS; try { do { try { @@ -101,7 +100,9 @@ public ScanResult runWithBackoff() { } catch (InterruptedException ie) { interrupted = true; } finally { - exponentialBackoffTime *= 2; + exponentialBackoffTime = Math.min( + BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME, + exponentialBackoffTime * 2); } continue; } @@ -113,4 +114,4 @@ public ScanResult runWithBackoff() { } } } -} \ No newline at end of file +} diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java index 6f5b356..b91bd58 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/SegmentedScanResult.java @@ -18,7 +18,7 @@ /** * Encapsulates segment number in scan result - * + * */ public class SegmentedScanResult { private final ScanResult result; diff --git a/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java b/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java index abcc53f..c5b6483 100644 --- a/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java +++ b/src/main/java/com/amazonaws/dynamodb/bootstrap/constants/BootstrapConstants.java @@ -59,11 +59,6 @@ public class BootstrapConstants { */ public static final long MAX_EXPONENTIAL_BACKOFF_TIME = 2048; - /** - * Max amount of retries before exiting or throwing exception. - */ - public static final int MAX_RETRIES = 10; - /** * Initial retry time for an exponential back-off call in milliseconds. */ diff --git a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java b/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java index 2572606..e27ca37 100644 --- a/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java +++ b/src/test/java/com/amazonaws/dynamodb/bootstrap/DynamoDBTableScanTest.java @@ -37,7 +37,7 @@ /** * Unit Tests for DynamoDBTableScan - * + * */ @RunWith(PowerMockRunner.class) @PrepareForTest({ RateLimiter.class, DynamoDBTableScan.class }) @@ -72,7 +72,7 @@ public void testGetParallelExecutorCompletionServiceWithVariousNumberOfSegments( expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, req) .andReturn(mockSegmentWorker); - expectNew(ParallelScanExecutor.class, mockExec, 1).andReturn( + expectNew(ParallelScanExecutor.class, mockExec, 1, 1).andReturn( mockScanExecutor); mockScanExecutor.addWorker(mockSegmentWorker, 0); @@ -80,9 +80,20 @@ public void testGetParallelExecutorCompletionServiceWithVariousNumberOfSegments( int segments2 = 3; ScanRequest testReq = scanner.copyScanRequest(req).withTotalSegments( segments2); - expectNew(ParallelScanExecutor.class, mockExec, segments2).andReturn( + + // [0,3) with 2 workers is split into [0,1) and [1,3) + expectNew(ParallelScanExecutor.class, mockExec, segments2, 1).andReturn( + mockScanExecutor); + for (int i = 0; i < 1; i++) { + expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, + scanner.copyScanRequest(testReq).withSegment(i)).andReturn( + mockSegmentWorker); + mockScanExecutor.addWorker(mockSegmentWorker, i); + } + + expectNew(ParallelScanExecutor.class, mockExec, segments2, 2).andReturn( mockScanExecutor); - for (int i = 0; i < segments2; i++) { + for (int i = 1; i < segments2; i++) { expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, scanner.copyScanRequest(testReq).withSegment(i)).andReturn( mockSegmentWorker); @@ -91,7 +102,8 @@ public void testGetParallelExecutorCompletionServiceWithVariousNumberOfSegments( replayAll(); scanner.getParallelScanCompletionService(req, segments, mockExec, 0, 1); - scanner.getParallelScanCompletionService(req, segments2, mockExec, 0, 1); + scanner.getParallelScanCompletionService(req, segments2, mockExec, 0, 2); + scanner.getParallelScanCompletionService(req, segments2, mockExec, 1, 2); verifyAll(); }