Skip to content

Commit

Permalink
CNDB-12431: Fix wrong bytes read value being reported on completion o…
Browse files Browse the repository at this point in the history
…f validation (#1538)
  • Loading branch information
blambov authored Jan 29, 2025
1 parent 2e52cab commit cece65b
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,11 @@ public TableMetadata metadata()

long bytesRead()
{
long[] bytesReadByLevel = this.bytesReadByLevel;
return Arrays.stream(bytesReadByLevel).reduce(Long::sum).orElse(0L);
long bytesScanned = 0L;
for (ISSTableScanner scanner : scanners)
bytesScanned += scanner.getBytesScanned();

return bytesScanned;
}

long bytesRead(int level)
Expand All @@ -180,15 +183,6 @@ long totalSourceRows()
return Arrays.stream(mergedRowsHistogram).reduce(0L, Long::sum);
}

public long getTotalBytesScanned()
{
long bytesScanned = 0L;
for (ISSTableScanner scanner : scanners)
bytesScanned += scanner.getBytesScanned();

return bytesScanned;
}

public long getTotalCompressedSize()
{
long compressedSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,7 @@ void execute0()
if (writer.append(partition))
totalKeysWritten++;

long bytesScanned = compactionIterator.getTotalBytesScanned();
long bytesScanned = compactionIterator.bytesRead();

// Rate limit the scanners, and account for compression
if (compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.cql3.CQLTester;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.AbstractTableOperation;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionStrategy;
import org.apache.cassandra.db.compaction.CompactionStrategyStatistics;
Expand Down Expand Up @@ -213,10 +214,27 @@ private void printStats()

public static synchronized void getStats()
{
operations.add(CompactionManager.instance.getSSTableTasks());
operations.add(CompactionManager.instance.getSSTableTasks()
.stream()
.map(BackgroundCompactionTrackingTest::snapshot)
.collect(Collectors.toList()));
statistics.add(strategy.getStatistics());
}

private static TableOperation.Progress snapshot(TableOperation.Progress progress)
{
// Take a snapshot to make sure we are capturing the values at the time ActiveOperations is called.
// This is to make sure we report the completed state then, and not end up okay because they were corrected
// when some component closed at a later time.
return new AbstractTableOperation.OperationProgress(progress.metadata(),
progress.operationType(),
progress.completed(),
progress.total(),
progress.unit(),
progress.operationId(),
progress.sstables());
}

static CompactionStrategy strategy;
static List<List<CompactionStrategyStatistics>> statistics;
static List<List<TableOperation.Progress>> operations;
Expand Down
21 changes: 21 additions & 0 deletions test/unit/org/apache/cassandra/repair/ValidatorTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,14 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.compaction.ActiveOperations;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.compaction.CompactionsTest;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.db.compaction.TableOperation;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.junit.After;
import org.junit.Before;
Expand Down Expand Up @@ -205,6 +210,17 @@ public void simpleValidationTest(int n) throws Exception
Collections.singletonList(cfs), desc.ranges, false, ActiveRepairService.UNREPAIRED_SSTABLE,
false, PreviewKind.NONE);

AtomicReference<TableOperation.Progress> progressOnCompletion = new AtomicReference<>();
CompactionManager.instance.active.registerListener(new ActiveOperations.CompactionProgressListener()
{
@Override
public void onCompleted(TableOperation.Progress progressOnCompleted)
{
if (progressOnCompleted.metadata() == cfs.metadata())
progressOnCompletion.set(progressOnCompleted);
}
});

final CompletableFuture<Message> outgoingMessageSink = registerOutgoingMessageSink();
Validator validator = new Validator(desc, host, 0, true, false, PreviewKind.NONE);
ValidationManager.instance.submitValidation(cfs, validator);
Expand All @@ -221,6 +237,11 @@ public void simpleValidationTest(int n) throws Exception
assertEquals(Math.pow(2, Math.ceil(Math.log(n) / Math.log(2))), iterator.next().getValue().size(), 0.0);
}
assertEquals(m.trees.rowCount(), n);

assertNotNull(progressOnCompletion.get());
assertEquals(OperationType.VALIDATION, progressOnCompletion.get().operationType());
assertTrue(progressOnCompletion.get().completed() > 0);
assertEquals(progressOnCompletion.get().total(), progressOnCompletion.get().completed());
}

/*
Expand Down

0 comments on commit cece65b

Please sign in to comment.