Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1230,7 +1230,9 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
long beginReadMS = Time.monotonicNow();
int nread = 0;
int ret;
while (true) {
// Stop once the slice is filled; an extra read with remaining()==0
// can trigger wasted slow-lane I/O in SCR.
while (tmp.hasRemaining()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution first. Just one nit concern, is it only one difference that return at L1235(after PR) or return at L1236(before PR)? It will perform a zero-byte transfer in-memory and return immediately if tmp.remaining()=0, So it is OK for me before PR. welcome discussion if I miss something. Thanks again.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. I hit the issue when I was testing short circuit read. If we call BlockReaderLocal.read with tmp.remaining() == 0, it will hit the slow lane via BlockReaderLocal::readWithBounceBuffer::fillDataBuf, which will actually allocate and read into the bounce buffer. But since user buffer is filled, we cannot drain into it and the read data is thrown away and wasted.

ret = reader.read(tmp);
if (ret <= 0) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,16 @@
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
Expand Down Expand Up @@ -364,4 +369,43 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
IOUtils.closeStream(out);
}
}

@Test
@Timeout(value = 60)
public void testAvoidsEmptyBufferRead() throws IOException {
Configuration conf = new Configuration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) {
DistributedFileSystem fs = cluster.getFileSystem();
DFSClient client = fs.getClient();
Path file = new Path("/preadFile");
int fileLength = 8192;
DFSTestUtil.createFile(fs, file, fileLength, (short) 1, 0L);

AtomicInteger emptyBufferReads = new AtomicInteger(0);
try (DFSInputStream in = new DFSInputStream(client, file.toString(),
true, null) {
@Override
protected BlockReader getBlockReader(LocatedBlock targetBlock,
long offsetInBlock, long length, InetSocketAddress targetAddr,
StorageType storageType, DatanodeInfo datanode)
throws IOException {
BlockReader real = super.getBlockReader(targetBlock, offsetInBlock,
length, targetAddr, storageType, datanode);
BlockReader wrapped = mock(BlockReader.class, delegatesTo(real));
doAnswer(inv -> {
if (!((ByteBuffer) inv.getArgument(0)).hasRemaining()) {
emptyBufferReads.incrementAndGet();
}
return real.read(inv.getArgument(0));
}).when(wrapped).read(any(ByteBuffer.class));
return wrapped;
}
}) {
byte[] dest = new byte[fileLength];
assertEquals(fileLength, in.read(0L, dest, 0, dest.length));
assertEquals(0, emptyBufferReads.get(),
"DFSInputStream should not call BlockReader.read with an empty buffer");
}
}
}
}
Loading