diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java index 4bb976908db9..32a58f85c60c 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java @@ -156,8 +156,11 @@ public TableMetadata metadata() long bytesRead() { - long[] bytesReadByLevel = this.bytesReadByLevel; - return Arrays.stream(bytesReadByLevel).reduce(Long::sum).orElse(0L); + long bytesScanned = 0L; + for (ISSTableScanner scanner : scanners) + bytesScanned += scanner.getBytesScanned(); + + return bytesScanned; } long bytesRead(int level) @@ -180,15 +183,6 @@ long totalSourceRows() return Arrays.stream(mergedRowsHistogram).reduce(0L, Long::sum); } - public long getTotalBytesScanned() - { - long bytesScanned = 0L; - for (ISSTableScanner scanner : scanners) - bytesScanned += scanner.getBytesScanned(); - - return bytesScanned; - } - public long getTotalCompressedSize() { long compressedSize = 0; diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 9e8746abd4ae..a649e8390901 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -712,7 +712,7 @@ void execute0() if (writer.append(partition)) totalKeysWritten++; - long bytesScanned = compactionIterator.getTotalBytesScanned(); + long bytesScanned = compactionIterator.bytesRead(); // Rate limit the scanners, and account for compression if (compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio)) diff --git a/test/unit/org/apache/cassandra/db/compaction/unified/BackgroundCompactionTrackingTest.java b/test/unit/org/apache/cassandra/db/compaction/unified/BackgroundCompactionTrackingTest.java index 44d96ec322c7..48fbee7f98a7 100644 --- a/test/unit/org/apache/cassandra/db/compaction/unified/BackgroundCompactionTrackingTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/unified/BackgroundCompactionTrackingTest.java @@ -37,6 +37,7 @@ import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.cql3.CQLTester; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.AbstractTableOperation; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.CompactionStrategy; import org.apache.cassandra.db.compaction.CompactionStrategyStatistics; @@ -213,10 +214,27 @@ private void printStats() public static synchronized void getStats() { - operations.add(CompactionManager.instance.getSSTableTasks()); + operations.add(CompactionManager.instance.getSSTableTasks() + .stream() + .map(BackgroundCompactionTrackingTest::snapshot) + .collect(Collectors.toList())); statistics.add(strategy.getStatistics()); } + private static TableOperation.Progress snapshot(TableOperation.Progress progress) + { + // Take a snapshot to make sure we are capturing the values at the time ActiveOperations is called. + // This is to make sure we report the completed state then, and not end up okay because they were corrected + // when some component closed at a later time. + return new AbstractTableOperation.OperationProgress(progress.metadata(), + progress.operationType(), + progress.completed(), + progress.total(), + progress.unit(), + progress.operationId(), + progress.sstables()); + } + static CompactionStrategy strategy; static List> statistics; static List> operations; diff --git a/test/unit/org/apache/cassandra/repair/ValidatorTest.java b/test/unit/org/apache/cassandra/repair/ValidatorTest.java index 62715beb1920..3d67ec82cabb 100644 --- a/test/unit/org/apache/cassandra/repair/ValidatorTest.java +++ b/test/unit/org/apache/cassandra/repair/ValidatorTest.java @@ -26,9 +26,14 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.compaction.ActiveOperations; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.CompactionsTest; +import org.apache.cassandra.db.compaction.OperationType; +import org.apache.cassandra.db.compaction.TableOperation; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.junit.After; import org.junit.Before; @@ -205,6 +210,17 @@ public void simpleValidationTest(int n) throws Exception Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE, false, PreviewKind.NONE); + AtomicReference progressOnCompletion = new AtomicReference<>(); + CompactionManager.instance.active.registerListener(new ActiveOperations.CompactionProgressListener() + { + @Override + public void onCompleted(TableOperation.Progress progressOnCompleted) + { + if (progressOnCompleted.metadata() == cfs.metadata()) + progressOnCompletion.set(progressOnCompleted); + } + }); + final CompletableFuture outgoingMessageSink = registerOutgoingMessageSink(); Validator validator = new Validator(desc, host, 0, true, false, PreviewKind.NONE); ValidationManager.instance.submitValidation(cfs, validator); @@ -221,6 +237,11 @@ public void simpleValidationTest(int n) throws Exception assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0); } assertEquals(m.trees.rowCount(), n); + + assertNotNull(progressOnCompletion.get()); + assertEquals(OperationType.VALIDATION, progressOnCompletion.get().operationType()); + assertTrue(progressOnCompletion.get().completed() > 0); + assertEquals(progressOnCompletion.get().total(), progressOnCompletion.get().completed()); } /*