diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index 3bd93a4f459c3f..4298e6068961b2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -81,6 +81,7 @@ import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_BYTES; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_OPERATIONS; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_EXCEPTIONS; import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore; @@ -158,7 +159,8 @@ class LocalFSFileInputStream extends FSInputStream implements STREAM_READ_EXCEPTIONS, STREAM_READ_SEEK_OPERATIONS, STREAM_READ_SKIP_OPERATIONS, - STREAM_READ_SKIP_BYTES) + STREAM_READ_SKIP_BYTES, + STREAM_READ_VECTORED_OPERATIONS) .build(); /** Reference to the bytes read counter for slightly faster counting. */ @@ -225,8 +227,7 @@ public int read() throws IOException { int value = fis.read(); if (value >= 0) { this.position++; - statistics.incrementBytesRead(1); - bytesRead.addAndGet(1); + recordBytesRead(1); } return value; } catch (IOException e) { // unexpected exception @@ -243,8 +244,7 @@ public int read(byte[] b, int off, int len) throws IOException { int value = fis.read(b, off, len); if (value > 0) { this.position += value; - statistics.incrementBytesRead(value); - bytesRead.addAndGet(value); + recordBytesRead(value); } return value; } catch (IOException e) { // unexpected exception @@ -252,7 +252,18 @@ public int read(byte[] b, int off, int len) throws IOException { throw new FSError(e); // assume native fs error } } - + + /** + * Count the number of bytes read in fs and io statistics. + * @param count + */ + private void recordBytesRead(final int count) { + if (count > 0) { + statistics.incrementBytesRead(count); + bytesRead.addAndGet(count); + } + } + @Override public int read(long position, byte[] b, int off, int len) throws IOException { @@ -266,8 +277,7 @@ public int read(long position, byte[] b, int off, int len) try { int value = fis.getChannel().read(bb, position); if (value > 0) { - statistics.incrementBytesRead(value); - ioStatistics.incrementCounter(STREAM_READ_BYTES, value); + recordBytesRead(value); } return value; } catch (IOException e) { @@ -328,6 +338,7 @@ public void readVectored(List ranges, public void readVectored(final List ranges, final IntFunction allocate, final Consumer release) throws IOException { + ioStatistics.incrementCounter(STREAM_READ_VECTORED_OPERATIONS); // Validate, but do not pass in a file length as it may change. List sortedRanges = sortRangeList(ranges); @@ -341,7 +352,8 @@ public void readVectored(final List ranges, // Initiate the asynchronous reads. new AsyncHandler(getAsyncChannel(), sortedRanges, - pool) + pool, + this::recordBytesRead) .initiateRead(); } } @@ -372,20 +384,25 @@ private static class AsyncHandler implements CompletionHandler /** Buffers being read. */ private final ByteBuffer[] buffers; + /* Callback to update statistics. */ + private final Consumer statisticsUpdater; + /** * Instantiate. * @param channel open channel. * @param ranges ranges to read. * @param allocateRelease pool for allocating buffers, and releasing on failure + * @param statisticsUpdater callback to update statistics. */ AsyncHandler( final AsynchronousFileChannel channel, final List ranges, - final ByteBufferPool allocateRelease) { + final ByteBufferPool allocateRelease, final Consumer statisticsUpdater) { this.channel = channel; this.ranges = ranges; this.buffers = new ByteBuffer[ranges.size()]; this.allocateRelease = allocateRelease; + this.statisticsUpdater = statisticsUpdater; } /** @@ -426,6 +443,8 @@ public void completed(Integer result, Integer rangeIndex) { // issue a read for the rest of the buffer channel.read(buffer, range.getOffset() + buffer.position(), rangeIndex, this); } else { + // read finished + statisticsUpdater.accept(range.getLength()); // Flip the buffer and declare success. buffer.flip(); range.getData().complete(buffer); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index 248333da9c9083..5a70c9433204ad 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -206,10 +206,21 @@ public void testVectoredReadMultipleRanges() throws Exception { combinedFuture.get(); validateVectoredReadResult(fileRanges, DATASET, 0); + assertionsWithinTestVectoredReadMultipleRanges(in, fileRanges); returnBuffersToPoolPostRead(fileRanges, pool); } } + /** + * Place to add some custom assertions within {@link #testVectoredReadMultipleRanges()}. + * @param in active input stream. + * @param fileRanges ranges of files read. + */ + protected void assertionsWithinTestVectoredReadMultipleRanges(final FSDataInputStream in, + final List fileRanges) { + + } + @Test public void testVectoredReadAndReadFully() throws Exception { List fileRanges = new ArrayList<>(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index e2a9bfb444a2e9..ff5e4bc5ee991a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -21,31 +21,39 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicLong; import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedClass; +import org.junit.jupiter.params.provider.MethodSource; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.ContractTestUtils; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedClass; -import org.junit.jupiter.params.provider.MethodSource; +import org.apache.hadoop.fs.statistics.IOStatistics; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; +import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS; import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.assertj.core.api.Assertions.assertThat; -@ParameterizedClass(name="buffer-{0}") +@ParameterizedClass(name = "buffer-{0}") @MethodSource("params") public class TestLocalFSContractVectoredRead extends AbstractContractVectoredReadTest { + private long initialBytesRead; + public TestLocalFSContractVectoredRead(final String bufferType) { super(bufferType); } @@ -87,28 +95,28 @@ public void testChecksumValidationDuringVectoredReadSmallFile() throws Exception * @throws Exception any exception other than ChecksumException */ private void validateCheckReadException(Path testPath, - int length, - List ranges) throws Exception { + int length, + List ranges) throws Exception { LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32); - try (FSDataOutputStream out = localFs.create(testPath, true)){ + try (FSDataOutputStream out = localFs.create(testPath, true)) { out.write(datasetCorrect); } Path checksumPath = localFs.getChecksumFile(testPath); Assertions.assertThat(localFs.exists(checksumPath)) - .describedAs("Checksum file should be present") - .isTrue(); + .describedAs("Checksum file should be present") + .isTrue(); CompletableFuture fis = localFs.openFile(testPath).build(); - try (FSDataInputStream in = fis.get()){ + try (FSDataInputStream in = fis.get()) { in.readVectored(ranges, getAllocate()); validateVectoredReadResult(ranges, datasetCorrect, 0); } final byte[] datasetCorrupted = ContractTestUtils.dataset(length, 'a', 64); - try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)){ + try (FSDataOutputStream out = localFs.getRaw().create(testPath, true)) { out.write(datasetCorrupted); } CompletableFuture fisN = localFs.openFile(testPath).build(); - try (FSDataInputStream in = fisN.get()){ + try (FSDataInputStream in = fisN.get()) { in.readVectored(ranges, getAllocate()); // Expect checksum exception when data is updated directly through // raw local fs instance. @@ -123,20 +131,68 @@ public void tesChecksumVectoredReadBoundaries() throws Exception { final int length = 1071; LocalFileSystem localFs = (LocalFileSystem) getFileSystem(); final byte[] datasetCorrect = ContractTestUtils.dataset(length, 'a', 32); - try (FSDataOutputStream out = localFs.create(testPath, true)){ + try (FSDataOutputStream out = localFs.create(testPath, true)) { out.write(datasetCorrect); } Path checksumPath = localFs.getChecksumFile(testPath); Assertions.assertThat(localFs.exists(checksumPath)) - .describedAs("Checksum file should be present at {} ", checksumPath) - .isTrue(); + .describedAs("Checksum file should be present at {} ", checksumPath) + .isTrue(); CompletableFuture fis = localFs.openFile(testPath).build(); List smallRange = new ArrayList<>(); smallRange.add(FileRange.createFileRange(1000, 71)); - try (FSDataInputStream in = fis.get()){ + try (FSDataInputStream in = fis.get()) { in.readVectored(smallRange, getAllocate()); validateVectoredReadResult(smallRange, datasetCorrect, 0); } } + /** + * subclass so that the bytes read count can be cached before the test run. + */ + @Test + @Override + public void testVectoredReadMultipleRanges() throws Exception { + initialBytesRead = getBytesRead(); + super.testVectoredReadMultipleRanges(); + } + + /** + * Validate statistics. + * Sometimes the tests failed with more than expected read, so the assertions are on + * {@code isGreaterThanOrEqualTo()} rather than exact values. + */ + @Override + protected void assertionsWithinTestVectoredReadMultipleRanges( + final FSDataInputStream in, + final List fileRanges) { + + // check the iostats + final long totalVectorReadLength = fileRanges.stream().mapToLong(FileRange::getLength).sum(); + final IOStatistics stats = in.getIOStatistics(); + assertThatStatisticCounter(stats, STREAM_READ_VECTORED_OPERATIONS) + .describedAs(STREAM_READ_VECTORED_OPERATIONS + " stream %s", stats) + .isEqualTo(1); + assertThatStatisticCounter(stats, STREAM_READ_BYTES) + .describedAs(STREAM_READ_BYTES + " in bytes read in stream %s", stats) + .isGreaterThanOrEqualTo(totalVectorReadLength); + + // validate filesystem stats, went up by at least that amount. + // expect counting of other things, crc files in particular + long currentBytesRead = getBytesRead(); + assertThat(currentBytesRead) + .describedAs("bytes read in stream %s", in) + .isGreaterThanOrEqualTo(initialBytesRead + totalVectorReadLength); + } + + /** + * API is deprecated, but Spark uses it, and it's how the regression was found. + * this is how the production code looks at our stats. + * @return counter of bytes read across all stores. Never reset. + */ + private static long getBytesRead() { + AtomicLong bytes = new AtomicLong(); + FileSystem.getAllStatistics().forEach(st -> bytes.addAndGet(st.getBytesRead())); + return bytes.get(); + } }