Skip to content

feature: add evicted buffer cache to improve read performance. #174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.qcloud.cos</groupId>
<artifactId>hadoop-cos</artifactId>
<version>8.3.22</version>
<version>8.3.23</version>
<packaging>jar</packaging>

<name>Apache Hadoop Tencent Cloud COS Support</name>
Expand Down Expand Up @@ -201,6 +201,7 @@
<target>8</target>
<excludes>
<exclude>**/CosNFileSystemExt.java</exclude>
<exclude>**/cosn/cache/LRURangeCache.java</exclude>
</excludes>
</configuration>
</plugin>
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Paths;
import java.util.Properties;

/**
Expand Down Expand Up @@ -162,7 +163,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final long DEFAULT_THREAD_KEEP_ALIVE_TIME = 60L;

public static final String READ_AHEAD_BLOCK_SIZE_KEY = "fs.cosn.read.ahead.block.size";
public static final long DEFAULT_READ_AHEAD_BLOCK_SIZE = 1 * Unit.MB;
public static final long DEFAULT_READ_AHEAD_BLOCK_SIZE = Unit.MB;
public static final String READ_AHEAD_QUEUE_SIZE = "fs.cosn.read.ahead.queue.size";
public static final int DEFAULT_READ_AHEAD_QUEUE_SIZE = 6;
// used to control getFileStatus list to judge dir whether exist.
Expand Down Expand Up @@ -284,6 +285,17 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
public static final String COSN_READ_BUFFER_POOL_CAPACITY = "fs.cosn.read.buffer.pool.capacity";
public static final long DEFAULT_READ_BUFFER_POOL_CAPACITY = -1;

// An experimental feature: Page Cache. By default, it is disabled, and the max size is 1GB.
public static final String COSN_READ_PAGE_CACHE_ENABLED = "fs.cosn.read.page.cache.enabled";
public static final String COSN_READ_PAGE_CACHE_DIR = "fs.cosn.read.page.cache.dir";
public static final String DEFAULT_READ_PAGE_CACHE_DIR = Paths.get(DEFAULT_TMP_DIR, "page_cache").toString();
public static final String COSN_READ_PAGE_CACHE_RANGE_CACHE_IMPL = "fs.cosn.read.page.cache.range.cache.impl";
public static final String DEFAULT_READ_PAGE_CACHE_RANGE_CACHE_IMPL = "org.apache.hadoop.fs.cosn.cache.LRURangeCache";
public static final String COSN_READ_PAGE_CACHE_FILE_NUM = "fs.cosn.read.page.cache.file.num";
public static final int DEFAULT_READ_PAGE_CACHE_FILE_NUM = 10;
public static final String COSN_READ_FRAGMENT_CACHE_EACH_FILE_FRAGMENT_NUM = "fs.cosn.read.page.cache.number";
public static final int DEFAULT_READ_FRAGMENT_CACHE_EACH_FILE_FRAGMENT_NUM = 200;

public static final String COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS = "fs.cosn.read.buffer.allocate.timeout.seconds";
public static final long DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS = 5;

Expand Down
68 changes: 65 additions & 3 deletions src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.apache.hadoop.fs.cosn.MemoryAllocator;
import org.apache.hadoop.fs.cosn.CosNOutOfMemoryException;
import org.apache.hadoop.fs.cosn.ReadBufferHolder;
import org.apache.hadoop.fs.cosn.cache.PageCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -22,6 +23,8 @@

import javax.annotation.Nullable;

import static org.apache.hadoop.fs.CosNConfigKeys.DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS;


public class CosNFSInputStream extends FSInputStream {
public static final Logger LOG =
Expand Down Expand Up @@ -115,6 +118,13 @@ public void free() {
}
}

private enum ReopenLocation {
PRE_READ_QUEUE,
PREVIOUS_BUFFER,
LOCAL_CACHE,
NONE
}

private FileSystem.Statistics statistics;
private final Configuration conf;
private final NativeFileSystemStore store;
Expand All @@ -137,6 +147,10 @@ public void free() {
// 设置一个 Previous buffer 用于暂存淘汰出来的队头元素,用以优化小范围随机读的性能
private ReadBuffer previousReadBuffer;

private final PageCache pageCache;

private ReopenLocation reopenLocation = ReopenLocation.NONE;

/**
* Input Stream
*
Expand All @@ -153,7 +167,7 @@ public CosNFSInputStream(
FileSystem.Statistics statistics,
String key,
FileStatus fileStatus,
ExecutorService readAheadExecutorService) {
ExecutorService readAheadExecutorService, PageCache pageCache) {
super();
this.conf = conf;
this.store = store;
Expand All @@ -177,6 +191,7 @@ public CosNFSInputStream(
this.readAheadExecutorService = readAheadExecutorService;
this.readBufferQueue =
new ArrayDeque<>(this.maxReadPartNumber);
this.pageCache = pageCache;
this.closed = new AtomicBoolean(false);
}

Expand All @@ -185,6 +200,13 @@ private void tryFreeBuffer(ReadBuffer readBuffer) {
&& readBuffer != previousReadBuffer
&& readBuffer != currentReadBuffer
&& (readBufferQueue.isEmpty() || readBufferQueue.peek() != readBuffer)) {
if (null != this.pageCache && readBuffer.getBuffer() != null) {
try {
this.pageCache.put(new PageCache.Page(this.fileStatus.getPath().toString(), readBuffer.getStart(), readBuffer.getBuffer()));
} catch (IOException e) {
LOG.warn("Failed to add page to the page cache", e);
}
}
readBuffer.free();
}
}
Expand Down Expand Up @@ -219,7 +241,10 @@ private synchronized void reopen(long pos) throws IOException {
// 发生了随机读,针对于小范围的回溯随机读,则直接看一下是否命中了前一次刚刚被淘汰出去的队头读缓存
// 如果不是,那么随机读只可能是发生了超出前一块范围的回溯随机读,或者是在预读队列范围或者是超出预读队列范围。
// 如果是在预读队列范围内,那么依赖在预读队列中查找直接定位到要读的块,如果是超出预读队列范围,那么队列会被排空,然后重新定位到要读的块和位置
if (null != this.previousReadBuffer && pos >= this.previousReadBuffer.getStart() && pos <= this.previousReadBuffer.getEnd()) {
if (this.reopenLocation == ReopenLocation.PREVIOUS_BUFFER
&& null != this.previousReadBuffer
&& pos >= this.previousReadBuffer.getStart()
&& pos <= this.previousReadBuffer.getEnd()) {
setCurrentReadBuffer(previousReadBuffer);
this.bufferStart = this.previousReadBuffer.getStart();
this.bufferEnd = this.previousReadBuffer.getEnd();
Expand All @@ -228,6 +253,31 @@ private synchronized void reopen(long pos) throws IOException {
this.nextPos = !this.readBufferQueue.isEmpty() ? this.readBufferQueue.getFirst().getStart() : pos + this.preReadPartSize;
return;
}

// 查一下是否在 local cache 中
if (this.reopenLocation == ReopenLocation.LOCAL_CACHE
&& null != this.pageCache) {
PageCache.Page page = this.pageCache.get(this.fileStatus.getPath().toString(), pos);
if (page != null) {
ReadBuffer readBuffer = new ReadBuffer(page.getOffsetInFile(), page.getOffsetInFile() + page.getContent().length - 1);
try {
readBuffer.allocate(conf.getLong(CosNConfigKeys.COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS,
DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS), TimeUnit.SECONDS);
System.arraycopy(page.getContent(), 0, readBuffer.getBuffer(), 0, page.getContent().length);
readBuffer.setStatus(ReadBuffer.SUCCESS);
setCurrentReadBuffer(readBuffer);
this.bufferStart = readBuffer.getStart();
this.bufferEnd = readBuffer.getEnd();
this.position = pos;
this.partRemaining = (this.bufferEnd - this.bufferStart + 1) - (pos - this.bufferStart);
this.nextPos = !this.readBufferQueue.isEmpty() ? this.readBufferQueue.getFirst().getStart() : pos + this.preReadPartSize;
return;
} catch (Exception e) {
LOG.error("allocate read buffer failed.", e);
// continue to reopen
}
}
}
}
// 在预读队列里面定位到要读的块
while (!this.readBufferQueue.isEmpty()) {
Expand Down Expand Up @@ -338,14 +388,22 @@ public void seek(long pos) throws IOException {
// 在上一次刚刚被淘汰的预读块中
this.position = pos;
this.partRemaining = -1; // 触发 reopen
this.reopenLocation = ReopenLocation.PREVIOUS_BUFFER;
} else if (!this.readBufferQueue.isEmpty() && pos >= this.readBufferQueue.getFirst().getStart() && pos <= this.readBufferQueue.getLast().getEnd()) {
// 在预读队列中
this.position = pos;
this.partRemaining = -1; // 触发 reopen
this.reopenLocation = ReopenLocation.PRE_READ_QUEUE;
} else if (null != this.pageCache && this.pageCache.contains(this.fileStatus.getPath().toString(), pos)) {
// 命中分片缓存
this.position = pos;
this.partRemaining = -1; // 触发 reopen
this.reopenLocation = ReopenLocation.LOCAL_CACHE;
} else {
// 既不在预读队列中,也不在上一次刚刚被淘汰的预读块中,那么直接定位到要读的块和位置
// 既不在预读队列中,也不在上一次刚刚被淘汰的预读块和本地缓存中,那么直接定位到要读的块和位置
this.position = pos;
this.partRemaining = -1;
this.reopenLocation = ReopenLocation.NONE;
}
}

Expand Down Expand Up @@ -441,6 +499,7 @@ public int available() throws IOException {

return (int) remaining;
}

@Override
public void close() throws IOException {
if (this.closed.get()) {
Expand All @@ -453,6 +512,9 @@ public void close() throws IOException {
}
setCurrentReadBuffer(null);
setPreviousReadBuffer(null);
if (null != this.pageCache) {
this.pageCache.remove(this.fileStatus.getPath().toString());
}
}

private void checkOpened() throws IOException {
Expand Down
Loading