Skip to content

Commit

Permalink
CNDB-12781: POC: Read from storage in parallel when reading from mult…
Browse files Browse the repository at this point in the history
…iple SSTables
  • Loading branch information
eolivelli committed Feb 7, 2025
1 parent e0d10be commit cba4c55
Showing 1 changed file with 26 additions and 0 deletions.
26 changes: 26 additions & 0 deletions src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

Expand Down Expand Up @@ -61,6 +62,11 @@
public class SinglePartitionReadCommand extends ReadCommand implements SinglePartitionReadQuery
{
protected static final SelectionDeserializer selectionDeserializer = new Deserializer();
private static final int MIN_ITERATOR_TO_PARALLELIZE_READS = Integer.getInteger("cassandra.read_min_ssatable_to_parallelize_reads", 2);
static
{
logger.debug("Setting cassandra.read_min_ssatable_to_parallelize_reads to {}", MIN_ITERATOR_TO_PARALLELIZE_READS);
}

private final DecoratedKey partitionKey;
private final ClusteringIndexFilter clusteringIndexFilter;
Expand Down Expand Up @@ -746,6 +752,26 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
StorageHook.instance.reportRead(cfs.metadata().id, partitionKey());

List<UnfilteredRowIterator> iterators = inputCollector.finalizeIterators(cfs, nowInSec(), controller.oldestUnrepairedTombstone());
if (MIN_ITERATOR_TO_PARALLELIZE_READS > 0 && iterators.size() > MIN_ITERATOR_TO_PARALLELIZE_READS)
{
try
{
// hasNext triggers the read from storage, we can parallelize this with the Storage Service
List<CompletableFuture<?>> handles = new ArrayList<>();
for (UnfilteredRowIterator it : iterators)
{
handles.add(CompletableFuture.runAsync(it::hasNext));
}
for (CompletableFuture<?> h : handles)
{
h.get(1, TimeUnit.MINUTES);
}
} catch (Throwable error)
{
logger.error("Failed to parallelize read for partition {}", partitionKey, error);
// we can ignore this error, it will be caught later in the pipeline
}
}
return withSSTablesIterated(iterators, controller, view.sstables.size(), cfs.metric, metricsCollector, startTimeNanos);
}
catch (RuntimeException | Error e)
Expand Down

0 comments on commit cba4c55

Please sign in to comment.