diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 4b6663f7a294..8b05377a5104 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -61,6 +62,11 @@ public class SinglePartitionReadCommand extends ReadCommand implements SinglePartitionReadQuery { protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); + private static final int MIN_ITERATOR_TO_PARALLELIZE_READS = Integer.getInteger("cassandra.read_min_ssatable_to_parallelize_reads", 2); + static + { + logger.debug("Setting cassandra.read_min_ssatable_to_parallelize_reads to {}", MIN_ITERATOR_TO_PARALLELIZE_READS); + } private final DecoratedKey partitionKey; private final ClusteringIndexFilter clusteringIndexFilter; @@ -746,6 +752,26 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs StorageHook.instance.reportRead(cfs.metadata().id, partitionKey()); List iterators = inputCollector.finalizeIterators(cfs, nowInSec(), controller.oldestUnrepairedTombstone()); + if (MIN_ITERATOR_TO_PARALLELIZE_READS > 0 && iterators.size() > MIN_ITERATOR_TO_PARALLELIZE_READS) + { + try + { + // hasNext triggers the read from storage, we can parallelize this with the Storage Service + List> handles = new ArrayList<>(); + for (UnfilteredRowIterator it : iterators) + { + handles.add(CompletableFuture.runAsync(it::hasNext)); + } + for (CompletableFuture h : handles) + { + h.get(1, TimeUnit.MINUTES); + } + } catch (Throwable error) + { + logger.error("Failed to parallelize read for partition {}", partitionKey, error); + // we can ignore this error, it will be caught later in the pipeline + } + } return withSSTablesIterated(iterators, controller, view.sstables.size(), cfs.metric, metricsCollector, startTimeNanos); } catch (RuntimeException | Error e)