Skip to content
This repository was archived by the owner on Jun 18, 2020. It is now read-only.

Fix multinode export. #4

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
*~
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws</groupId>
<version>1.0.1</version>
<version>1.0.2</version>
<artifactId>dynamodb-import-export-tool</artifactId>
<packaging>jar</packaging>
<name>DynamoDB Import Export Tool</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public abstract class AbstractLogConsumer {
/**
* Writes the result of a scan to another endpoint asynchronously. Will call
* getWorker to determine what job to submit with the result.
*
*
* @param <result>
* the SegmentedScanResult to asynchronously write to another
* endpoint.
Expand All @@ -51,7 +51,7 @@ public abstract class AbstractLogConsumer {

/**
* Shuts the thread pool down.
*
*
* @param <awaitTermination>
* If true, this method waits for the threads in the pool to
* finish. If false, this thread pool shuts down without
Expand All @@ -77,4 +77,4 @@ public void shutdown(boolean awaitTermination) {
threadPool.shutdownNow();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract void pipe(final AbstractLogConsumer consumer)

/**
* Shuts the thread pool down.
*
*
* @param <awaitTermination>
* If true, this method waits for the threads in the pool to
* finish. If false, this thread pool shuts down without
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

/**
* Mixin for attribute values to stay all capital when mapping them as strings.
*
*
*/
public abstract class AttributeValueMixIn {
@JsonProperty("S") public abstract String getS();
Expand All @@ -47,4 +47,4 @@ public abstract class AttributeValueMixIn {
@JsonProperty("M") public abstract void setM(Map<String, AttributeValue> val);
@JsonProperty("L") public abstract List<AttributeValue> getL();
@JsonProperty("L") public abstract void setL(List<AttributeValue> val);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,4 @@ public Void call() {
}
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public int getTotalSections() {
public int getSection() {
return section;
}

public static final String CONSISTENT_SCAN = "--consistentScan";
@Parameter(names = CONSISTENT_SCAN, description = "Use this flag to use strongly consistent scan. If the flag is not used it will default to eventually consistent scan")
private boolean consistentScan = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class CommandLineInterface {
/**
* Main class to begin transferring data from one DynamoDB table to another
* DynamoDB table.
*
*
* @param args
*/
public static void main(String[] args) {
Expand Down Expand Up @@ -107,6 +107,9 @@ public static void main(String[] args) {
final double writeThroughput = calculateThroughput(
writeTableDescription, writeThroughputRatio, false);

LOGGER.info("Calculated read throughput: " + readThroughput);
LOGGER.info("Calculated write throughput: " + writeThroughput);

try {
ExecutorService sourceExec = getSourceThreadPool(numSegments);
ExecutorService destinationExec = getDestinationThreadPool(maxWriteThreads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class DynamoDBBootstrapWorker extends AbstractLogProvider {
/**
* Creates the DynamoDBBootstrapWorker, calculates the number of segments a
* table should have, and creates a thread pool to prepare to scan.
*
*
* @throws Exception
*/
public DynamoDBBootstrapWorker(AmazonDynamoDBClient client,
Expand Down Expand Up @@ -71,7 +71,7 @@ public DynamoDBBootstrapWorker(AmazonDynamoDBClient client,
* Creates the DynamoDBBootstrapWorker, calculates the number of segments a
* table should have, and creates a thread pool to prepare to scan using an
* eventually consistent scan.
*
*
* @throws Exception
*/
public DynamoDBBootstrapWorker(AmazonDynamoDBClient client,
Expand Down Expand Up @@ -128,7 +128,7 @@ public void pipe(final AbstractLogConsumer consumer)
* table, which should need many more segments in order to scan the table
* fast enough in parallel so that one worker does not finish long before
* other workers.
*
*
* @throws NullReadCapacityException
* if the table returns a null readCapacity units.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,4 @@ public static List<BatchWriteItemRequest> splitResultIntoBatches(
}
return batches;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,4 @@ public int getSize() {
public Map<String, AttributeValue> getEntry() {
return entry;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,26 +39,27 @@ public DynamoDBTableScan(double rateLimit, AmazonDynamoDBClient client) {
/**
* This function copies a scan request for the number of segments and then
* adds those workers to the executor service to begin scanning.
*
*
* @param totalSections
* @param section
*
*
* @return <ParallelScanExecutor> the parallel scan executor to grab results
* when a segment is finished.
*/
public ParallelScanExecutor getParallelScanCompletionService(
ScanRequest initialRequest, int numSegments, Executor executor,
int section, int totalSections) {
final int segments = Math.max(1, numSegments);
final ParallelScanExecutor completion = new ParallelScanExecutor(
executor, segments);

int sectionSize = segments / totalSections;
int start = sectionSize * section;
int end = start + sectionSize;
if (section + 1 == totalSections) {
end = segments;
}
final int numberToComplete = end - start;
final ParallelScanExecutor completion = new ParallelScanExecutor(
executor, segments, numberToComplete);

for (int segment = start; segment < end; segment++) {
ScanRequest scanSegment = copyScanRequest(initialRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

/**
* Class used to calculate the size of a DynamoDB item in bytes.
*
*
*/
public class ItemSizeCalculator {

Expand All @@ -47,7 +47,7 @@ public static int calculateItemSizeInBytes(Map<String, AttributeValue> item) {
}
return size;
}

public static int calculateScanResultSizeInBytes(ScanResult result) {
final Iterator<Map<String, AttributeValue>> it = result.getItems().iterator();
int totalBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,20 @@
* This class executes multiple scan requests on one segment of a table in
* series, as a runnable. Instances meant to be used as tasks of the worker
* thread pool for parallel scans.
*
*
*/
public class ParallelScanExecutor {
private final BitSet finished;
private final ScanSegmentWorker[] workers;
private final ExecutorCompletionService<SegmentedScanResult> exec;
private final int numberToComplete;

public ParallelScanExecutor(Executor executor, int segments) {
public ParallelScanExecutor(Executor executor, int segments, int numberToComplete) {
this.exec = new ExecutorCompletionService<SegmentedScanResult>(executor);
this.finished = new BitSet(segments);
this.finished.clear();
this.workers = new ScanSegmentWorker[segments];
this.numberToComplete = numberToComplete;
}

/**
Expand All @@ -56,14 +58,14 @@ public void finishSegment(int segment) {
*/
public boolean finished() {
synchronized (finished) {
return finished.cardinality() == workers.length;
return finished.cardinality() == numberToComplete;
}
}

/**
* This method gets a segmentedScanResult and submits the next scan request
* for that segment, if there is one.
*
*
* @return the next available ScanResult
* @throws ExecutionException
* if one of the segment pages threw while executing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* This class executes multiple scan requests on one segment of a table in
* series, as a runnable. Instances meant to be used as tasks of the worker
* thread pool for parallel scans.
*
*
*/
public class ScanSegmentWorker implements Callable<SegmentedScanResult> {
private final ScanRequest request;
Expand All @@ -43,7 +43,6 @@ public class ScanSegmentWorker implements Callable<SegmentedScanResult> {
this.client = client;
this.rateLimiter = rateLimiter;
this.hasNext = true;
this.exponentialBackoffTime = BootstrapConstants.INITIAL_RETRY_TIME_MILLISECONDS;
lastConsumedCapacity = 256;
}

Expand All @@ -62,7 +61,6 @@ public SegmentedScanResult call() {
lastConsumedCapacity = result.getConsumedCapacity()
.getCapacityUnits().intValue();
} else if (result.getScannedCount() != null && result.getCount() != null) {

final boolean isConsistent = request.getConsistentRead();
int itemSize = isConsistent ? BootstrapConstants.STRONGLY_CONSISTENT_READ_ITEM_SIZE
: BootstrapConstants.EVENTUALLY_CONSISTENT_READ_ITEM_SIZE;
Expand Down Expand Up @@ -91,6 +89,7 @@ public SegmentedScanResult call() {
public ScanResult runWithBackoff() {
ScanResult result = null;
boolean interrupted = false;
exponentialBackoffTime = BootstrapConstants.INITIAL_RETRY_TIME_MILLISECONDS;
try {
do {
try {
Expand All @@ -101,7 +100,9 @@ public ScanResult runWithBackoff() {
} catch (InterruptedException ie) {
interrupted = true;
} finally {
exponentialBackoffTime *= 2;
exponentialBackoffTime = Math.min(
BootstrapConstants.MAX_EXPONENTIAL_BACKOFF_TIME,
exponentialBackoffTime * 2);
}
continue;
}
Expand All @@ -113,4 +114,4 @@ public ScanResult runWithBackoff() {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

/**
* Encapsulates segment number in scan result
*
*
*/
public class SegmentedScanResult {
private final ScanResult result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,6 @@ public class BootstrapConstants {
*/
public static final long MAX_EXPONENTIAL_BACKOFF_TIME = 2048;

/**
* Max amount of retries before exiting or throwing exception.
*/
public static final int MAX_RETRIES = 10;

/**
* Initial retry time for an exponential back-off call in milliseconds.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

/**
* Unit Tests for DynamoDBTableScan
*
*
*/
@RunWith(PowerMockRunner.class)
@PrepareForTest({ RateLimiter.class, DynamoDBTableScan.class })
Expand Down Expand Up @@ -72,17 +72,28 @@ public void testGetParallelExecutorCompletionServiceWithVariousNumberOfSegments(

expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter, req)
.andReturn(mockSegmentWorker);
expectNew(ParallelScanExecutor.class, mockExec, 1).andReturn(
expectNew(ParallelScanExecutor.class, mockExec, 1, 1).andReturn(
mockScanExecutor);

mockScanExecutor.addWorker(mockSegmentWorker, 0);

int segments2 = 3;
ScanRequest testReq = scanner.copyScanRequest(req).withTotalSegments(
segments2);
expectNew(ParallelScanExecutor.class, mockExec, segments2).andReturn(

// [0,3) with 2 workers is split into [0,1) and [1,3)
expectNew(ParallelScanExecutor.class, mockExec, segments2, 1).andReturn(
mockScanExecutor);
for (int i = 0; i < 1; i++) {
expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter,
scanner.copyScanRequest(testReq).withSegment(i)).andReturn(
mockSegmentWorker);
mockScanExecutor.addWorker(mockSegmentWorker, i);
}

expectNew(ParallelScanExecutor.class, mockExec, segments2, 2).andReturn(
mockScanExecutor);
for (int i = 0; i < segments2; i++) {
for (int i = 1; i < segments2; i++) {
expectNew(ScanSegmentWorker.class, mockClient, mockRateLimiter,
scanner.copyScanRequest(testReq).withSegment(i)).andReturn(
mockSegmentWorker);
Expand All @@ -91,7 +102,8 @@ public void testGetParallelExecutorCompletionServiceWithVariousNumberOfSegments(

replayAll();
scanner.getParallelScanCompletionService(req, segments, mockExec, 0, 1);
scanner.getParallelScanCompletionService(req, segments2, mockExec, 0, 1);
scanner.getParallelScanCompletionService(req, segments2, mockExec, 0, 2);
scanner.getParallelScanCompletionService(req, segments2, mockExec, 1, 2);
verifyAll();
}

Expand Down