diff --git a/pom.xml b/pom.xml
index d81a901..2e1b693 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
 
     <groupId>com.qcloud.cos</groupId>
     <artifactId>hadoop-cos</artifactId>
-    <version>8.3.22</version>
+    <version>8.3.23</version>
     <packaging>jar</packaging>
 
     <name>Apache Hadoop Tencent Cloud COS Support</name>
@@ -201,6 +201,7 @@
                     <target>8</target>
                     <excludes>
                         <exclude>**/CosNFileSystemExt.java</exclude>
+                        <exclude>**/cosn/cache/LRURangeCache.java</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
index 79a50d4..5f7fd56 100644
--- a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
+++ b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java
@@ -7,6 +7,7 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.file.Paths;
 import java.util.Properties;
 
 /**
@@ -162,7 +163,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
     public static final long DEFAULT_THREAD_KEEP_ALIVE_TIME = 60L;
 
     public static final String READ_AHEAD_BLOCK_SIZE_KEY = "fs.cosn.read.ahead.block.size";
-    public static final long DEFAULT_READ_AHEAD_BLOCK_SIZE = 1 * Unit.MB;
+    public static final long DEFAULT_READ_AHEAD_BLOCK_SIZE = Unit.MB;
     public static final String READ_AHEAD_QUEUE_SIZE = "fs.cosn.read.ahead.queue.size";
     public static final int DEFAULT_READ_AHEAD_QUEUE_SIZE = 6;
     // used to control getFileStatus list to judge dir whether exist.
@@ -284,6 +285,17 @@ public class CosNConfigKeys extends CommonConfigurationKeys {
     public static final String COSN_READ_BUFFER_POOL_CAPACITY = "fs.cosn.read.buffer.pool.capacity";
     public static final long DEFAULT_READ_BUFFER_POOL_CAPACITY = -1;
 
+    // An experimental feature: Page Cache. By default, it is disabled, and the max size is 1GB.
+    public static final String COSN_READ_PAGE_CACHE_ENABLED = "fs.cosn.read.page.cache.enabled";
+    public static final String COSN_READ_PAGE_CACHE_DIR = "fs.cosn.read.page.cache.dir";
+    public static final String DEFAULT_READ_PAGE_CACHE_DIR = Paths.get(DEFAULT_TMP_DIR, "page_cache").toString();
+    public static final String COSN_READ_PAGE_CACHE_RANGE_CACHE_IMPL = "fs.cosn.read.page.cache.range.cache.impl";
+    public static final String DEFAULT_READ_PAGE_CACHE_RANGE_CACHE_IMPL = "org.apache.hadoop.fs.cosn.cache.LRURangeCache";
+    public static final String COSN_READ_PAGE_CACHE_FILE_NUM = "fs.cosn.read.page.cache.file.num";
+    public static final int DEFAULT_READ_PAGE_CACHE_FILE_NUM = 10;
+    public static final String COSN_READ_FRAGMENT_CACHE_EACH_FILE_FRAGMENT_NUM = "fs.cosn.read.page.cache.number";
+    public static final int DEFAULT_READ_FRAGMENT_CACHE_EACH_FILE_FRAGMENT_NUM = 200;
+
     public static final String COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS = "fs.cosn.read.buffer.allocate.timeout.seconds";
     public static final long DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS = 5;
 
diff --git a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java
index 091b79a..d198268 100644
--- a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java
+++ b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java
@@ -4,6 +4,7 @@
 import org.apache.hadoop.fs.cosn.MemoryAllocator;
 import org.apache.hadoop.fs.cosn.CosNOutOfMemoryException;
 import org.apache.hadoop.fs.cosn.ReadBufferHolder;
+import org.apache.hadoop.fs.cosn.cache.PageCache;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -22,6 +23,8 @@
 
 import javax.annotation.Nullable;
 
+import static org.apache.hadoop.fs.CosNConfigKeys.DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS;
+
 
 public class CosNFSInputStream extends FSInputStream {
     public static final Logger LOG =
@@ -115,6 +118,13 @@ public void free() {
         }
     }
 
+    private enum ReopenLocation {
+        PRE_READ_QUEUE,
+        PREVIOUS_BUFFER,
+        LOCAL_CACHE,
+        NONE
+    }
+
     private FileSystem.Statistics statistics;
     private final Configuration conf;
     private final NativeFileSystemStore store;
@@ -137,6 +147,10 @@ public void free() {
     // 设置一个 Previous buffer 用于暂存淘汰出来的队头元素,用以优化小范围随机读的性能
     private ReadBuffer previousReadBuffer;
 
+    private final PageCache pageCache;
+
+    private ReopenLocation reopenLocation = ReopenLocation.NONE;
+
     /**
      * Input Stream
      *
@@ -153,7 +167,7 @@ public CosNFSInputStream(
             FileSystem.Statistics statistics,
             String key,
             FileStatus fileStatus,
-            ExecutorService readAheadExecutorService) {
+            ExecutorService readAheadExecutorService, PageCache pageCache) {
         super();
         this.conf = conf;
         this.store = store;
@@ -177,6 +191,7 @@ public CosNFSInputStream(
         this.readAheadExecutorService = readAheadExecutorService;
         this.readBufferQueue =
                 new ArrayDeque<>(this.maxReadPartNumber);
+        this.pageCache = pageCache;
         this.closed = new AtomicBoolean(false);
     }
 
@@ -185,6 +200,13 @@ private void tryFreeBuffer(ReadBuffer readBuffer) {
                 && readBuffer != previousReadBuffer
                 && readBuffer != currentReadBuffer
                 && (readBufferQueue.isEmpty() || readBufferQueue.peek() != readBuffer)) {
+            if (null != this.pageCache && readBuffer.getBuffer() != null) {
+                try {
+                    this.pageCache.put(new PageCache.Page(this.fileStatus.getPath().toString(), readBuffer.getStart(), readBuffer.getBuffer()));
+                } catch (IOException e) {
+                    LOG.warn("Failed to add page to the page cache", e);
+                }
+            }
             readBuffer.free();
         }
     }
@@ -219,7 +241,10 @@ private synchronized void reopen(long pos) throws IOException {
             // 发生了随机读,针对于小范围的回溯随机读,则直接看一下是否命中了前一次刚刚被淘汰出去的队头读缓存
             // 如果不是,那么随机读只可能是发生了超出前一块范围的回溯随机读,或者是在预读队列范围或者是超出预读队列范围。
             // 如果是在预读队列范围内,那么依赖在预读队列中查找直接定位到要读的块,如果是超出预读队列范围,那么队列会被排空,然后重新定位到要读的块和位置
-            if (null != this.previousReadBuffer && pos >= this.previousReadBuffer.getStart() && pos <= this.previousReadBuffer.getEnd()) {
+            if (this.reopenLocation == ReopenLocation.PREVIOUS_BUFFER
+                    && null != this.previousReadBuffer
+                    && pos >= this.previousReadBuffer.getStart()
+                    && pos <= this.previousReadBuffer.getEnd()) {
                 setCurrentReadBuffer(previousReadBuffer);
                 this.bufferStart = this.previousReadBuffer.getStart();
                 this.bufferEnd = this.previousReadBuffer.getEnd();
@@ -228,6 +253,31 @@ private synchronized void reopen(long pos) throws IOException {
                 this.nextPos = !this.readBufferQueue.isEmpty() ? this.readBufferQueue.getFirst().getStart() : pos + this.preReadPartSize;
                 return;
             }
+
+            // 查一下是否在 local cache 中
+            if (this.reopenLocation == ReopenLocation.LOCAL_CACHE
+                    && null != this.pageCache) {
+                PageCache.Page page = this.pageCache.get(this.fileStatus.getPath().toString(), pos);
+                if (page != null) {
+                    ReadBuffer readBuffer = new ReadBuffer(page.getOffsetInFile(), page.getOffsetInFile() + page.getContent().length - 1);
+                    try {
+                        readBuffer.allocate(conf.getLong(CosNConfigKeys.COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS,
+                                DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS), TimeUnit.SECONDS);
+                        System.arraycopy(page.getContent(), 0, readBuffer.getBuffer(), 0, page.getContent().length);
+                        readBuffer.setStatus(ReadBuffer.SUCCESS);
+                        setCurrentReadBuffer(readBuffer);
+                        this.bufferStart = readBuffer.getStart();
+                        this.bufferEnd = readBuffer.getEnd();
+                        this.position = pos;
+                        this.partRemaining = (this.bufferEnd - this.bufferStart + 1) - (pos - this.bufferStart);
+                        this.nextPos = !this.readBufferQueue.isEmpty() ? this.readBufferQueue.getFirst().getStart() : pos + this.preReadPartSize;
+                        return;
+                    } catch (Exception e) {
+                        LOG.error("allocate read buffer failed.", e);
+                        // continue to reopen
+                    }
+                }
+            }
         }
         // 在预读队列里面定位到要读的块
         while (!this.readBufferQueue.isEmpty()) {
@@ -338,14 +388,22 @@ public void seek(long pos) throws IOException {
             // 在上一次刚刚被淘汰的预读块中
             this.position = pos;
             this.partRemaining = -1;    // 触发 reopen
+            this.reopenLocation = ReopenLocation.PREVIOUS_BUFFER;
         } else if (!this.readBufferQueue.isEmpty() && pos >= this.readBufferQueue.getFirst().getStart() && pos <= this.readBufferQueue.getLast().getEnd()) {
             // 在预读队列中
             this.position = pos;
             this.partRemaining = -1;    // 触发 reopen
+            this.reopenLocation = ReopenLocation.PRE_READ_QUEUE;
+        } else if (null != this.pageCache && this.pageCache.contains(this.fileStatus.getPath().toString(), pos)) {
+            // 命中分片缓存
+            this.position = pos;
+            this.partRemaining = -1;    // 触发 reopen
+            this.reopenLocation = ReopenLocation.LOCAL_CACHE;
         } else {
-            // 既不在预读队列中,也不在上一次刚刚被淘汰的预读块中,那么直接定位到要读的块和位置
+            // 既不在预读队列中,也不在上一次刚刚被淘汰的预读块和本地缓存中,那么直接定位到要读的块和位置
             this.position = pos;
             this.partRemaining = -1;
+            this.reopenLocation = ReopenLocation.NONE;
         }
     }
 
@@ -441,6 +499,7 @@ public int available() throws IOException {
 
         return (int) remaining;
     }
+
     @Override
     public void close() throws IOException {
         if (this.closed.get()) {
@@ -453,6 +512,9 @@ public void close() throws IOException {
         }
         setCurrentReadBuffer(null);
         setPreviousReadBuffer(null);
+        if (null != this.pageCache) {
+            this.pageCache.remove(this.fileStatus.getPath().toString());
+        }
     }
 
     private void checkOpened() throws IOException {
diff --git a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
index 76a7b3e..b6c2f91 100644
--- a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
+++ b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java
@@ -15,6 +15,8 @@
 import org.apache.hadoop.fs.cosn.OperationCancellingStatusProvider;
 import org.apache.hadoop.fs.cosn.ReadBufferHolder;
 import org.apache.hadoop.fs.cosn.Unit;
+import org.apache.hadoop.fs.cosn.cache.LocalPageCache;
+import org.apache.hadoop.fs.cosn.cache.PageCache;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -76,6 +78,8 @@ public class CosNFileSystem extends FileSystem {
 
     private RangerCredentialsClient rangerCredentialsClient;
 
+    private PageCache localPageCache = null;
+
     static final String CSE_ALGORITHM_USER_METADATA = "client-side-encryption-cek-alg";
     private int symbolicLinkSizeThreshold;
 
@@ -149,11 +153,16 @@ public void initialize(URI uri, Configuration conf) throws IOException {
 
         BufferPool.getInstance().initialize(this.getConf());
         if (this.getConf().getBoolean(CosNConfigKeys.COSN_POSIX_EXTENSION_ENABLED,
-            CosNConfigKeys.DEFAULT_COSN_POSIX_EXTENSION_ENABLED)) {
+                CosNConfigKeys.DEFAULT_COSN_POSIX_EXTENSION_ENABLED)) {
             // 只有在开启 POSIX 扩展语义支持的时候才会初始化
             LocalRandomAccessMappedBufferPool.getInstance().initialize(this.getConf());
         }
 
+        if (this.getConf().getBoolean(CosNConfigKeys.COSN_READ_PAGE_CACHE_ENABLED, false)) {
+            this.localPageCache = LocalPageCache.getInstance();
+            this.localPageCache.initialize(this.getConf());
+        }
+
         // initialize the thread pool
         int uploadThreadPoolSize = this.getConf().getInt(
                 CosNConfigKeys.UPLOAD_THREAD_POOL_SIZE_KEY,
@@ -193,7 +202,7 @@ public void initialize(URI uri, Configuration conf) throws IOException {
                 new RejectedExecutionHandler() {
                     @Override
                     public void rejectedExecution(Runnable r,
-                            ThreadPoolExecutor executor) {
+                                                  ThreadPoolExecutor executor) {
                         if (!executor.isShutdown()) {
                             try {
                                 executor.getQueue().put(r);
@@ -223,7 +232,7 @@ public void rejectedExecution(Runnable r,
                 new RejectedExecutionHandler() {
                     @Override
                     public void rejectedExecution(Runnable r,
-                            ThreadPoolExecutor executor) {
+                                                  ThreadPoolExecutor executor) {
                         if (!executor.isShutdown()) {
                             try {
                                 executor.getQueue().put(r);
@@ -259,7 +268,7 @@ public void rejectedExecution(Runnable r,
                 new RejectedExecutionHandler() {
                     @Override
                     public void rejectedExecution(Runnable r,
-                            ThreadPoolExecutor executor) {
+                                                  ThreadPoolExecutor executor) {
                         if (!executor.isShutdown()) {
                             try {
                                 executor.getQueue().put(r);
@@ -274,15 +283,15 @@ public void rejectedExecution(Runnable r,
 
         this.symbolicLinkSizeThreshold = this.getConf().getInt(
                 CosNConfigKeys.COSN_SYMBOLIC_SIZE_THRESHOLD, CosNConfigKeys.DEFAULT_COSN_SYMBOLIC_SIZE_THRESHOLD);
-        this.directoryFirstEnabled =  this.getConf().getBoolean(CosNConfigKeys.COSN_FILESTATUS_DIR_FIRST_ENABLED,
-            CosNConfigKeys.DEFAULT_FILESTATUS_DIR_FIRST_ENABLED);
+        this.directoryFirstEnabled = this.getConf().getBoolean(CosNConfigKeys.COSN_FILESTATUS_DIR_FIRST_ENABLED,
+                CosNConfigKeys.DEFAULT_FILESTATUS_DIR_FIRST_ENABLED);
         ReadBufferHolder.initialize(this.getConf().getLong(CosNConfigKeys.COSN_READ_BUFFER_POOL_CAPACITY,
-            CosNConfigKeys.DEFAULT_READ_BUFFER_POOL_CAPACITY));
+                CosNConfigKeys.DEFAULT_READ_BUFFER_POOL_CAPACITY));
         if (this.getConf().getInt(CosNConfigKeys.COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS, 5) < 0) {
             throw new IllegalArgumentException("fs.cosn.read.buffer.allocate.timeout.seconds cannot be negative.");
         }
         this.createOpCheckExistFile = this.getConf().getBoolean(CosNConfigKeys.COSN_CREATE_FILE_EXIST_OP_ENABLED,
-            CosNConfigKeys.DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE);
+                CosNConfigKeys.DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE);
     }
 
     @Override
@@ -302,7 +311,7 @@ public Path getHomeDirectory() {
 
     @Override
     public FSDataOutputStream append(Path f, int bufferSize,
-        Progressable progress) throws IOException {
+                                     Progressable progress) throws IOException {
         if (this.isPosixBucket) {
             throw new UnsupportedOperationException(
                     "The posix bucket does not support append operation through S3 gateway");
@@ -336,56 +345,56 @@ public FSDataOutputStream append(Path f, int bufferSize,
         Path absolutePath = makeAbsolute(f);
         String cosKey = pathToKey(absolutePath);
         if (this.getConf().getBoolean(CosNConfigKeys.COSN_POSIX_EXTENSION_ENABLED,
-            CosNConfigKeys.DEFAULT_COSN_POSIX_EXTENSION_ENABLED)) {
+                CosNConfigKeys.DEFAULT_COSN_POSIX_EXTENSION_ENABLED)) {
             // 这里使用类的动态加载和创建机制是为了在默认场景下(即不支持随机写的场景),可以不依赖 ofs-sdk-definition 这个 jar 包。
             Class<?> seekableOutputStreamClass;
             Object seekableOutputStream;
             try {
                 seekableOutputStreamClass = Class.forName("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream");
                 Constructor<?> constructor = seekableOutputStreamClass.getConstructor(Configuration.class, NativeFileSystemStore.class,
-                    String.class, ExecutorService.class, ExecutorService.class);
+                        String.class, ExecutorService.class, ExecutorService.class);
                 seekableOutputStream = constructor.newInstance(this.getConf(), this.nativeStore, cosKey, this.boundedIOThreadPool, this.boundedCopyThreadPool);
             } catch (ClassNotFoundException e) {
                 throw new IOException("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream can not be found. " +
-                    "please make sure that ofs-sdk-definition.jar is placed in the classpath.", e);
+                        "please make sure that ofs-sdk-definition.jar is placed in the classpath.", e);
             } catch (InvocationTargetException e) {
                 throw new IOException(e);
             } catch (NoSuchMethodException e) {
                 throw new IOException("Failed to find the constructor of the " +
-                    "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e);
+                        "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e);
             } catch (InstantiationException e) {
                 throw new IOException("Failed to create the object of the " +
-                    "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e);
+                        "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e);
             } catch (IllegalAccessException e) {
                 throw new IOException("Failed to access the constructor of the " +
-                    "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e);
+                        "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e);
             }
 
             try {
                 Class<?> seekableFSDataOutputStreamClass = Class.forName("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream");
                 Constructor<?> constructor = seekableFSDataOutputStreamClass.getConstructor(
-                    Class.forName("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream"),
-                    FileSystem.Statistics.class);
+                        Class.forName("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream"),
+                        FileSystem.Statistics.class);
                 return (FSDataOutputStream) constructor.newInstance(seekableOutputStream, statistics);
             } catch (ClassNotFoundException e) {
                 throw new IOException("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream can not be found. " +
-                    "please make sure that ofs-sdk-definition.jar is placed in the classpath.", e);
+                        "please make sure that ofs-sdk-definition.jar is placed in the classpath.", e);
             } catch (NoSuchMethodException e) {
                 throw new IOException("Failed to find the constructor of the " +
-                    "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e);
+                        "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e);
             } catch (InvocationTargetException e) {
                 throw new IOException(e);
             } catch (InstantiationException e) {
                 throw new IOException("Failed to create the object of the " +
-                    "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e);
+                        "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e);
             } catch (IllegalAccessException e) {
                 throw new IOException("Failed to access the constructor of the " +
-                    "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e);
+                        "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e);
             }
         } else {
             return new FSDataOutputStream(new CosNExtendedFSDataOutputStream(
-                this.getConf(), this.nativeStore, cosKey, this.boundedCopyThreadPool, this.boundedCopyThreadPool,true),
-                statistics, fileStatus.getLen());
+                    this.getConf(), this.nativeStore, cosKey, this.boundedCopyThreadPool, this.boundedCopyThreadPool, true),
+                    statistics, fileStatus.getLen());
         }
     }
 
@@ -394,7 +403,7 @@ public boolean truncate(Path f, long newLength) throws IOException {
         // 默认可以支持 truncate
         if (this.isPosixBucket) {
             throw new UnsupportedOperationException(
-                "The posix bucket does not support the truncate operation through S3 gateway.");
+                    "The posix bucket does not support the truncate operation through S3 gateway.");
         }
 
         // 如果配置中开启客户端加密,则不支持
@@ -476,19 +485,19 @@ public FSDataOutputStream create(Path f, FsPermission permission,
                     throw new FileAlreadyExistsException("Directory already exists: " + f);
                 }
             } catch (FileNotFoundException ignore) {
-            // NOTE: 这里之前认为可能会出现从 COS 的 SDK 或者 API 上传了一个 / 结尾的有内容对象
-            // 那么再在这个文件前缀下面成功创建新的对象而不报错的话,其实是不符合文件系统语义规范的。
-            // 同时,也是为了保证一个完整的目录结构,但是确实会带来元数据查询请求的明显放大。
-            // 不过这里,因为一般不会出现 / 结尾的内容对象,即使出现也不会覆盖丢失(因为这里相当于它的一个commonPrefix,原始对象还在COS里面)
-            // 所以决定去掉这个检查,来改善优化性能。
-            // validatePath(f)
+                // NOTE: 这里之前认为可能会出现从 COS 的 SDK 或者 API 上传了一个 / 结尾的有内容对象
+                // 那么再在这个文件前缀下面成功创建新的对象而不报错的话,其实是不符合文件系统语义规范的。
+                // 同时,也是为了保证一个完整的目录结构,但是确实会带来元数据查询请求的明显放大。
+                // 不过这里,因为一般不会出现 / 结尾的内容对象,即使出现也不会覆盖丢失(因为这里相当于它的一个commonPrefix,原始对象还在COS里面)
+                // 所以决定去掉这个检查,来改善优化性能。
+                // validatePath(f)
             }
         }
 
         Path absolutePath = makeAbsolute(f);
         String key = pathToKey(absolutePath);
         if (this.getConf().getBoolean(CosNConfigKeys.COSN_FLUSH_ENABLED,
-            CosNConfigKeys.DEFAULT_COSN_FLUSH_ENABLED)) {
+                CosNConfigKeys.DEFAULT_COSN_FLUSH_ENABLED)) {
             // Need to support the synchronous flush.
             return new FSDataOutputStream(
                     new CosNExtendedFSDataOutputStream(this.getConf(), nativeStore, key,
@@ -591,7 +600,7 @@ private void internalRecursiveDelete(String key, int listMaxLength) throws IOExc
             priorLastKey = listing.getPriorLastKey();
 
             if (this.operationCancellingStatusProviderThreadLocal.get() != null
-            && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
+                    && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
                 LOG.warn("The delete operation is cancelled. key: {}.", key);
                 throw new IOException("The delete operation is cancelled. key: " + key);
             }
@@ -712,7 +721,7 @@ private FileStatus getFileStatus(Path f, Set<FileStatusProbeEnum> probes) throws
      * 否则返回文件。
      */
     private FileStatus getFileStatusHelper(FileMetadata meta, Path absolutePath, String key,
-        String headRequestId) throws IOException {
+                                           String headRequestId) throws IOException {
         if (directoryFirstEnabled && meta.getLength() == 0) {
             if (!key.endsWith(PATH_DELIMITER)) {
                 key += PATH_DELIMITER;
@@ -732,6 +741,7 @@ private FileStatus getFileStatusHelper(FileMetadata meta, Path absolutePath, Str
 
     /**
      * 使用list接口查询目录
+     *
      * @return null 如果目录不存在
      */
     private FileStatus getFileStatusByList(String key, Path absolutePath, String headRequestId) throws IOException {
@@ -833,7 +843,7 @@ public FileStatus[] listStatus(Path f) throws IOException {
             priorLastKey = listing.getPriorLastKey();
 
             if (this.operationCancellingStatusProviderThreadLocal.get() != null
-            && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
+                    && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) {
                 LOG.warn("The list operation is cancelled. key: {}.", key);
                 throw new IOException("The list operation is cancelled. key: " + key);
             }
@@ -1011,7 +1021,7 @@ public FSDataInputStream open(Path f, int bufferSize, FileStatus fileStatus) thr
         String key = pathToKey(absolutePath);
         return new FSDataInputStream(new CosNFSBufferedFSInputStream(
                 new CosNFSInputStream(this.getConf(), nativeStore, statistics, key,
-                        fileStatus, this.boundedIOThreadPool),
+                        fileStatus, this.boundedIOThreadPool, this.localPageCache),
                 bufferSize));
     }
 
@@ -1461,22 +1471,22 @@ public void createSymlink(Path target, Path link, boolean createParent)
         }
 
         try {
-          FileStatus parentStatus = this.getFileStatus(link.getParent());
-          if (!parentStatus.isDirectory()) {
-            throw new ParentNotDirectoryException(
-                String.format("The parent path of the symlink [%s] is not a directory.", link));
-          }
+            FileStatus parentStatus = this.getFileStatus(link.getParent());
+            if (!parentStatus.isDirectory()) {
+                throw new ParentNotDirectoryException(
+                        String.format("The parent path of the symlink [%s] is not a directory.", link));
+            }
         } catch (FileNotFoundException parentDirNotFoundException) {
-          if (createParent) {
-            LOG.debug("The parent directory of the symlink [{}] does not exist, " +
-                "creating it.", link.getParent());
-            if (!this.mkdirs(link.getParent())) {
-              throw new IOException(String.format(
-                  "Failed to create the parent directory of the symlink [%s].", link));
+            if (createParent) {
+                LOG.debug("The parent directory of the symlink [{}] does not exist, " +
+                        "creating it.", link.getParent());
+                if (!this.mkdirs(link.getParent())) {
+                    throw new IOException(String.format(
+                            "Failed to create the parent directory of the symlink [%s].", link));
+                }
+            } else {
+                throw parentDirNotFoundException;
             }
-          } else {
-            throw parentDirNotFoundException;
-          }
         }
 
         try {
@@ -1529,9 +1539,9 @@ public FileStatus getFileLinkStatus(final Path f)
 
     @Override
     public boolean supportsSymlinks() {
-      return this.getConf().getBoolean(
-          CosNConfigKeys.COSN_SUPPORT_SYMLINK_ENABLED,
-          CosNConfigKeys.DEFAULT_COSN_SUPPORT_SYMLINK_ENABLED);
+        return this.getConf().getBoolean(
+                CosNConfigKeys.COSN_SUPPORT_SYMLINK_ENABLED,
+                CosNConfigKeys.DEFAULT_COSN_SUPPORT_SYMLINK_ENABLED);
     }
 
     /**
@@ -1599,6 +1609,9 @@ public void close() throws IOException {
                 LOG.error("boundedIOThreadPool shutdown interrupted.", e);
             }
             BufferPool.getInstance().close();
+            if (null != this.localPageCache) {
+                this.localPageCache.close();
+            }
             super.close();
         } finally {
             // copy 和 delete 因为涉及到元数据操作,因此最后再释放
diff --git a/src/main/java/org/apache/hadoop/fs/CosNUtils.java b/src/main/java/org/apache/hadoop/fs/CosNUtils.java
index 825de03..ae3555e 100644
--- a/src/main/java/org/apache/hadoop/fs/CosNUtils.java
+++ b/src/main/java/org/apache/hadoop/fs/CosNUtils.java
@@ -331,6 +331,46 @@ public static Path keyToPath(String key, String pathDelimiter) {
         }
     }
 
+    /**
+     * Create a temporary directory for buffer files.
+     *
+     * @param tmpDir
+     * @return
+     * @throws IOException
+     */
+    public static File createTempDir(String tmpDir) throws IOException {
+        File file = new File(tmpDir);
+        if (!file.exists()) {
+            LOG.debug("Temp directory: [{}] does not exist. Create it first.",
+                    file);
+            if (file.mkdirs()) {
+                if (!file.setWritable(true, false) || !file.setReadable(true, false)
+                        || !file.setExecutable(true, false)) {
+                    LOG.warn("Set the buffer dir: [{}]'s permission [writable,"
+                            + "readable, executable] failed.", file);
+                }
+                LOG.debug("Temp directory: [{}] is created successfully.",
+                        file.getAbsolutePath());
+            } else {
+                // If the directory cannot be created, throw an IOException.
+                // Prevent problems created by multiple processes at the same
+                // time.
+                if (!file.exists()) {
+                    throw new IOException("Temp directory:" + file.getAbsolutePath() + " is created unsuccessfully");
+                }
+            }
+        } else {
+            LOG.debug("Temp directory: {} already exists.", file.getAbsolutePath());
+            // Check if the tmp dir has read and write permissions.
+            if (!CosNUtils.checkDirectoryRWPermissions(tmpDir)) {
+                String exceptionMsg = String.format("The tmp dir does not have read or write permissions." + "dir: %s", tmpDir);
+                throw new IllegalArgumentException(exceptionMsg);
+            }
+        }
+
+        return file;
+    }
+
     public static boolean checkDirectoryRWPermissions(String directoryPath) {
         java.nio.file.Path path = Paths.get(directoryPath);
 
@@ -347,4 +387,27 @@ public static boolean checkDirectoryRWPermissions(String directoryPath) {
 
         return isReadable && isWritable;
     }
+
+    /**
+     * Clear the temporary directory.
+     * This method will delete all files and directories under the specified temporary directory.
+     *
+     * @param tmpDir
+     */
+    public static void clearTempDir(String tmpDir) {
+        File file = new File(tmpDir);
+        if (file.exists() && file.isDirectory()) {
+            File[] files = file.listFiles();
+            if (files != null) {
+                for (File f : files) {
+                    if (f.isFile()) {
+                        f.delete();
+                    } else if (f.isDirectory()) {
+                        clearTempDir(f.getAbsolutePath());
+                        f.delete();
+                    }
+                }
+            }
+        }
+    }
 }
diff --git a/src/main/java/org/apache/hadoop/fs/cosn/Constants.java b/src/main/java/org/apache/hadoop/fs/cosn/Constants.java
index 87b74f8..dce4b56 100644
--- a/src/main/java/org/apache/hadoop/fs/cosn/Constants.java
+++ b/src/main/java/org/apache/hadoop/fs/cosn/Constants.java
@@ -10,6 +10,9 @@ private Constants() {
     // Suffix for local cache file name
     public static final String BLOCK_TMP_FILE_SUFFIX = "_local_block_cache";
 
+    public static final String PAGE_TEMP_FILE_PREFIX = "cos_";
+    public static final String PAGE_TEMP_FILE_SUFFIX = "_local_page_cache";
+
     // Crc32c server response header key
     public static final String CRC32C_RESP_HEADER = "x-cos-hash-crc32c";
     // Crc32c agent request header key
diff --git a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java
index 4077861..673b5de 100644
--- a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java
+++ b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java
@@ -44,6 +44,15 @@ public CosNByteBuffer put(byte[] src, int offset, int length) throws IOException
         return this;
     }
 
+    public CosNByteBuffer putInt(int value) throws IOException {
+        if (this.byteBuffer.remaining() < Integer.BYTES) {
+            throw new IOException("There is no remaining in the buffer for an int.");
+        }
+        this.byteBuffer.putInt(value);
+        this.nextWritePosition = this.byteBuffer.position();
+        return this;
+    }
+
     public byte get() {
         return this.byteBuffer.get();
     }
@@ -53,6 +62,14 @@ public CosNByteBuffer get(byte[] dst, int offset, int length) {
         return this;
     }
 
+    public int getInt() {
+        return this.byteBuffer.getInt();
+    }
+
+    public int getInt(int index) {
+        return this.byteBuffer.getInt(index);
+    }
+
     public int capacity() {
         return this.byteBuffer.capacity();
     }
diff --git a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java
index 737e67c..594691c 100644
--- a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java
+++ b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java
@@ -24,48 +24,12 @@ public class CosNMappedBufferFactory implements CosNBufferFactory {
 
     public CosNMappedBufferFactory(String[] tmpDirList, boolean deleteOnExit) throws IOException {
         for (String tmpDir : tmpDirList) {
-            File createDir = CosNMappedBufferFactory.createDir(tmpDir);
+            File createDir = CosNUtils.createTempDir(tmpDir);
             tmpDirs.add(createDir);
         }
         this.deleteOnExit = deleteOnExit;
     }
 
-    private static File createDir(String tmpDir) throws IOException {
-        File file = new File(tmpDir);
-        if (!file.exists()) {
-            LOG.debug("Buffer dir: [{}] does not exist. Create it first.",
-                    file);
-            if (file.mkdirs()) {
-                if (!file.setWritable(true, false) || !file.setReadable(true, false)
-                        || !file.setExecutable(true, false)) {
-                    LOG.warn("Set the buffer dir: [{}]'s permission [writable,"
-                            + "readable, executable] failed.", file);
-                }
-                LOG.debug("Buffer dir: [{}] is created successfully.",
-                        file.getAbsolutePath());
-            } else {
-                // Once again, check if it has been created successfully.
-                // Prevent problems created by multiple processes at the same
-                // time.
-                if (!file.exists()) {
-                    throw new IOException("buffer dir:" + file.getAbsolutePath()
-                            + " is created unsuccessfully");
-                }
-            }
-        } else {
-            LOG.debug("buffer dir: {} already exists.",
-                    file.getAbsolutePath());
-            // Check whether you have read and write permissions for the directory during initialization
-            if (!CosNUtils.checkDirectoryRWPermissions(tmpDir)) {
-                String exceptionMsg = String.format("The tmp dir does not have read or write permissions." +
-                        "dir: %s", tmpDir);
-                throw new IllegalArgumentException(exceptionMsg);
-            }
-        }
-
-        return file;
-    }
-
     @Override
     public CosNByteBuffer create(int size) {
         return this.create(Constants.BLOCK_TMP_FILE_PREFIX,
@@ -89,7 +53,7 @@ public CosNByteBuffer create(String prefix, String suffix, int size) {
             LOG.warn("The tmp dir does not exist.");
             // try to create the tmp directory.
             try {
-                CosNMappedBufferFactory.createDir(tmpDir.getAbsolutePath());
+                CosNUtils.createTempDir(tmpDir.getAbsolutePath());
             } catch (IOException e) {
                 LOG.error("Try to create the tmp dir [{}] failed.", tmpDir.getAbsolutePath(), e);
                 return null;
diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/LRURangeCache.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/LRURangeCache.java
new file mode 100644
index 0000000..48bb2b3
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/LRURangeCache.java
@@ -0,0 +1,158 @@
+package org.apache.hadoop.fs.cosn.cache;
+
+import com.google.common.collect.BoundType;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeMap;
+import com.google.common.collect.TreeRangeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+public class LRURangeCache<K extends Comparable<K>, V> extends RangeCache<K, V> {
+    private static final Logger LOG = LoggerFactory.getLogger(LRURangeCache.class);
+
+    private final int capacity;
+    private final RangeMap<K, V> rangeMap;
+    private final LinkedHashMap<Range<K>, Boolean> lruMap;
+
+    public LRURangeCache(int capacity) {
+        super(0);
+        this.capacity = capacity;
+        this.rangeMap = TreeRangeMap.create();
+        this.lruMap = new LinkedHashMap<Range<K>, Boolean>(capacity, 0.75f, true) {
+            @Override
+            protected boolean removeEldestEntry(Map.Entry<Range<K>, Boolean> eldest) {
+                if (size() > LRURangeCache.this.capacity) {
+                    V removedValue = rangeMap.get(eldest.getKey().lowerEndpoint());
+                    rangeMap.remove(eldest.getKey());
+                    if (removalListener != null) {
+                        LOG.debug("Removing key {} from LRURangeCache.", eldest.getKey());
+                        RemovalNotification<RangeCache.Entry<K>, V> notification = new RemovalNotification<>(
+                                convertRangeToEntry(eldest.getKey()), removedValue);
+                        removalListener.onRemoval(notification);
+                    }
+                    return true;
+                }
+                return false;
+            }
+        };
+    }
+
+    // 添加范围和对应的值
+    @Override
+    public void put(RangeCache.Entry<K> entry, V value) {
+        rangeMap.put(Range.closedOpen(entry.getLowerEndpoint(), entry.getUpperEndpoint()), value);
+        lruMap.put(Range.closedOpen(entry.getLowerEndpoint(), entry.getUpperEndpoint()), Boolean.TRUE);
+    }
+
+
+    // 获取范围内的值
+    @Override
+    public V get(K key) {
+        V value = rangeMap.get(key);
+        if (value != null) {
+            // 更新 LRU 顺序
+            Range<K> entry = rangeMap.asMapOfRanges().keySet().stream().filter(r -> r.contains(key)).findFirst().orElse(null);
+            if (entry != null) {
+                lruMap.get(entry);
+            }
+        }
+        return value;
+    }
+
+    @Override
+    public Entry<K> getEntry(K key) {
+       return convertRangeToEntry(rangeMap.getEntry(key).getKey());
+    }
+
+    @Override
+    public boolean contains(K key) {
+        return rangeMap.get(key) != null;
+    }
+
+    @Override
+    public void remove(K key) {
+        Map.Entry<Range<K>, V> entry = rangeMap.getEntry(key);
+        if (entry != null) {
+            Range<K> range = entry.getKey();
+            V value = entry.getValue();
+            rangeMap.remove(range);
+            lruMap.remove(range);
+            if (removalListener != null) {
+                RemovalNotification notification = new RemovalNotification<>(
+                        convertRangeToEntry(range), value);
+                removalListener.onRemoval(notification);
+            }
+        }
+    }
+
+    // 获取缓存大小
+    public int size() {
+        return lruMap.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return lruMap.isEmpty();
+    }
+
+    @Override
+    public void clear() {
+        // 逐个清空并等待回调完成
+        for (Map.Entry<Range<K>, Boolean> entry : lruMap.entrySet()) {
+            Range<K> range = entry.getKey();
+            V value = rangeMap.get(range.lowerEndpoint());
+            if (value != null && removalListener != null) {
+                LOG.debug("Clearing key {} from LRURangeCache.", range);
+                RemovalNotification notification = new RemovalNotification<>(
+                        this.convertRangeToEntry(range), value);
+                removalListener.onRemoval(notification);
+            }
+        }
+    }
+
+    @Override
+    public boolean containsOverlaps(RangeCache.Entry<K> entry) {
+        Range<K> range = Range.closedOpen(entry.getLowerEndpoint(), entry.getUpperEndpoint());
+        return rangeMap.asMapOfRanges().keySet().stream().anyMatch(r -> r.isConnected(range) && !r.intersection(range).isEmpty());
+    }
+
+    private Range<K> convertRangeHelper(RangeCache.Entry<K> entry) {
+        switch (entry.getRangeType()) {
+            case CLOSED_OPEN:
+                return Range.closedOpen(entry.getLowerEndpoint(), entry.getUpperEndpoint());
+            case OPEN_OPEN:
+                return Range.open(entry.getLowerEndpoint(), entry.getUpperEndpoint());
+            case OPEN_CLOSED:
+                return Range.openClosed(entry.getLowerEndpoint(), entry.getUpperEndpoint());
+            case CLOSED_CLOSED:
+                return Range.closed(entry.getLowerEndpoint(), entry.getUpperEndpoint());
+            default:
+                throw new IllegalArgumentException("Unsupported range type: " + entry.getRangeType());
+        }
+    }
+
+    private RangeCache.Entry<K> convertRangeToEntry(Range<K> range) {
+        if (range.lowerBoundType() == BoundType.OPEN && range.upperBoundType() == BoundType.OPEN) {
+            return RangeCache.Entry.open(range.lowerEndpoint(), range.upperEndpoint());
+        }
+
+        if (range.lowerBoundType() == BoundType.CLOSED && range.upperBoundType() == BoundType.OPEN) {
+            return RangeCache.Entry.closedOpen(range.lowerEndpoint(), range.upperEndpoint());
+        }
+
+        if (range.lowerBoundType() == BoundType.OPEN && range.upperBoundType() == BoundType.CLOSED) {
+            return RangeCache.Entry.openClosed(range.lowerEndpoint(), range.upperEndpoint());
+        }
+
+        if (range.lowerBoundType() == BoundType.CLOSED && range.upperBoundType() == BoundType.CLOSED) {
+            return RangeCache.Entry.closed(range.lowerEndpoint(), range.upperEndpoint());
+        }
+
+        throw new IllegalArgumentException(
+                String.format("Unsupported range: %s, lowerBoundedType: %s, uppperBoundedType: %s.",
+                        range, range.lowerBoundType(), range.upperBoundType()));
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/LocalPageCache.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/LocalPageCache.java
new file mode 100644
index 0000000..2cc9ae8
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/LocalPageCache.java
@@ -0,0 +1,377 @@
+package org.apache.hadoop.fs.cosn.cache;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CosNConfigKeys;
+import org.apache.hadoop.fs.CosNUtils;
+import org.apache.hadoop.fs.cosn.buffer.CosNBufferFactory;
+import org.apache.hadoop.fs.cosn.buffer.CosNByteBuffer;
+import org.apache.hadoop.fs.cosn.buffer.CosNMappedBufferFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+
+public class LocalPageCache implements PageCache {
+    private static final Logger LOG = LoggerFactory.getLogger(LocalPageCache.class);
+
+    private static volatile LocalPageCache instance;
+
+    private AtomicInteger referCount = new AtomicInteger(0);
+    private AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+    private Map<String, FileCacheEntry> fileCacheMap;
+    private int pageNum;
+    private int pageSize;
+    private String cacheDir;
+    private CosNBufferFactory cosNBufferFactory;
+    private Constructor<? extends RangeCache> rangeCacheConstructor;
+
+    private final class FileCacheEntry {
+        // 头部用来存放每个页面的长度
+        private static final int META_HEAD_SIZE = Integer.BYTES;
+        // 每个文件缓存项的锁
+        private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+        // <rangeInFile, SlotIndex>
+        private final RangeCache<Long, Integer> rangeIndex;
+        // 0 代表空闲,1 代表占用
+        private final byte[] pageIndexes;
+        private final CosNByteBuffer pagesBuffer;
+        private final AtomicBoolean isValid;
+
+        public FileCacheEntry() throws IOException {
+            this.pageIndexes = new byte[pageNum];
+            try {
+                this.rangeIndex = rangeCacheConstructor.newInstance(pageNum);
+            } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
+                throw new IOException("Failed to create RangeCache instance", e);
+            }
+            this.rangeIndex.setRemovalListener(notification -> {
+                this.lock.writeLock().lock();
+                try {
+                    this.pageIndexes[notification.getValue()] = 0;
+                } finally {
+                    this.lock.writeLock().unlock();
+                }
+            });
+            pagesBuffer = cosNBufferFactory.create((META_HEAD_SIZE + pageSize) * pageNum);
+            this.isValid = new AtomicBoolean(true);
+        }
+
+        public void put(byte[] pageData, long offsetInFile) throws IOException {
+            if (pageData == null || pageData.length == 0) {
+                LOG.warn("Page is null or content is null. Skip to put it into cache.");
+                return;
+            }
+
+            if (pageData.length > pageSize) {
+                LOG.warn("Page size is greater than the size of the cache. Skip to put it.");
+                return;
+            }
+
+            this.lock.writeLock().lock();
+            if (!this.isValid.get()) {
+                LOG.warn("FileCacheEntry is not valid, cannot put page data.");
+                return; // 如果 FileCacheEntry 已经失效,直接返回
+            }
+            try {
+                if (this.rangeIndex.contains(offsetInFile)) {
+                    LOG.debug("Skipping page as it is already in use");
+                    return;
+                }
+                // 找一个空闲槽位
+                int slotIndex = -1;
+                for (int i = 0; i < this.pageIndexes.length; i++) {
+                    if (this.pageIndexes[i] == 0) {
+                        slotIndex = i;
+                        break;
+                    }
+                }
+                if (slotIndex == -1) {
+                    LOG.debug("No free slot available in the page cache.");
+                    return; // 没有空闲槽位
+                }
+                // 写入数据
+                this.pagesBuffer.position(slotIndex * (META_HEAD_SIZE + pageSize));
+                // 写入数据长度到头部
+                this.pagesBuffer.putInt(pageData.length);
+                // 写入实际数据
+                this.pagesBuffer.put(pageData, 0, pageData.length);
+
+                // 更新 page 索引和 range 索引
+                this.pageIndexes[slotIndex] = 1;
+                this.rangeIndex.put(RangeCache.Entry.closedOpen(offsetInFile,
+                    offsetInFile + pageData.length), slotIndex);
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        }
+
+        public byte[] get(long offset) throws IOException {
+            this.lock.readLock().lock();
+            if (!this.isValid.get()) {
+                LOG.warn("FileCacheEntry is not valid, cannot get page data.");
+                return null; // 如果 FileCacheEntry 已经失效,直接返回
+            }
+
+            try {
+                if (!this.rangeIndex.contains(offset)) {
+                    LOG.debug("Page not found in cache for offset: {}", offset);
+                    return null;
+                }
+
+                int slotIndex = this.rangeIndex.get(offset);
+                RangeCache.Entry<Long> rangeEntry = this.rangeIndex.getEntry(offset);
+                int dataSize = this.pagesBuffer.getInt(slotIndex * (META_HEAD_SIZE + pageSize));
+                // 头部放 offsetInFile,后面是实际数据
+                ByteBuffer data = ByteBuffer.allocate(Long.BYTES + dataSize);
+                data.clear();
+                data.putLong(rangeEntry.getLowerEndpoint());
+                this.pagesBuffer.position(slotIndex * (META_HEAD_SIZE + pageSize) + META_HEAD_SIZE);
+                this.pagesBuffer.get(data.array(), Long.BYTES, dataSize);
+                return data.array();
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        }
+
+        public void remove(long offset) throws IOException {
+            this.lock.writeLock().lock();
+            if (!this.isValid.get()) {
+                LOG.warn("FileCacheEntry is not valid, cannot remove page data.");
+                return; // 如果 FileCacheEntry 已经失效,直接返回
+            }
+            try {
+                if (!this.rangeIndex.contains(offset)) {
+                    LOG.warn("Page with offset {} not found in cache.", offset);
+                    return; // 如果没有找到对应的页面,直接返回
+                }
+                int slotIndex = this.rangeIndex.get(offset);
+                this.rangeIndex.remove(offset); // 从索引中移除
+                this.pageIndexes[slotIndex] = 0; // 标记为未使用
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        }
+
+        public void release() throws IOException {
+            this.lock.writeLock().lock();
+            if (!this.isValid.get()) {
+                LOG.warn("FileCacheEntry is already released, nothing to do.");
+                return; // 如果 FileCacheEntry 已经失效,直接返回
+            }
+            this.isValid.set(false);
+            try {
+                this.rangeIndex.clear();
+                // 清理索引
+                Arrays.fill(this.pageIndexes, (byte) 0);
+                this.pagesBuffer.close();
+            } finally {
+                this.lock.writeLock().unlock();
+            }
+        }
+
+        public boolean contains(long offset) {
+            this.lock.readLock().lock();
+            if (!this.isValid.get()) {
+                LOG.warn("FileCacheEntry is not valid, cannot check contains.");
+                return false; // 如果 FileCacheEntry 已经失效,直接返回
+            }
+
+            try {
+                return this.rangeIndex.contains(offset);
+            } finally {
+                this.lock.readLock().unlock();
+            }
+        }
+
+    }
+
+    private LocalPageCache() {
+    }
+
+    // 获取单例实例
+    public static LocalPageCache getInstance() {
+        if (null == instance) {
+            synchronized (LocalPageCache.class) {
+                if (null == instance) {
+                    LOG.info("Created new LocalPageCache instance.");
+                    instance = new LocalPageCache();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public synchronized void initialize(Configuration conf) throws IOException {
+        if (isInitialized.get()) {
+            LOG.info("Cache is already initialized, incrementing referCount. " +
+                    "Current count: {}", referCount.get());
+            this.referCount.incrementAndGet();
+            return; // 已经初始化,直接返回
+        }
+        int fileNumber = conf.getInt(CosNConfigKeys.COSN_READ_PAGE_CACHE_FILE_NUM,
+                CosNConfigKeys.DEFAULT_READ_PAGE_CACHE_FILE_NUM);
+        this.pageNum = conf.getInt(CosNConfigKeys.COSN_READ_FRAGMENT_CACHE_EACH_FILE_FRAGMENT_NUM,
+                CosNConfigKeys.DEFAULT_READ_FRAGMENT_CACHE_EACH_FILE_FRAGMENT_NUM);
+        this.fileCacheMap = new ConcurrentHashMap<>(fileNumber);
+        this.pageSize = conf.getInt(CosNConfigKeys.READ_AHEAD_BLOCK_SIZE_KEY, (int) CosNConfigKeys.DEFAULT_READ_AHEAD_BLOCK_SIZE);
+        this.cacheDir = conf.get(CosNConfigKeys.COSN_READ_PAGE_CACHE_DIR, CosNConfigKeys.DEFAULT_READ_PAGE_CACHE_DIR);
+        this.cosNBufferFactory = new CosNMappedBufferFactory(new String[]{this.cacheDir}, true);
+        this.rangeCacheConstructor = buildRangeCacheConstructor(
+                conf.get(CosNConfigKeys.COSN_READ_PAGE_CACHE_RANGE_CACHE_IMPL,
+                        CosNConfigKeys.DEFAULT_READ_PAGE_CACHE_RANGE_CACHE_IMPL));
+
+        // 确保缓存目录存在,如果不存在则创建,如果存在,则清空目录下的数据
+        CosNUtils.createTempDir(this.cacheDir);
+        CosNUtils.clearTempDir(this.cacheDir);
+
+        isInitialized.set(true);
+    }
+
+    @Override
+    public void put(Page page) throws IOException {
+        this.checkInitialize();
+        FileCacheEntry fileCacheEntry = this.fileCacheMap.computeIfAbsent(page.getFilePath(),
+            new Function<String, FileCacheEntry>() {
+            @Override
+            public @Nullable FileCacheEntry apply(@Nullable String key) {
+                try {
+                    return new FileCacheEntry();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        });
+        fileCacheEntry.put(page.getContent(), page.getOffsetInFile());
+    }
+
+    @Override
+    public Page get(String filePath, long startOffsetInFile) throws IOException {
+        this.checkInitialize();
+        FileCacheEntry fileCacheEntry = this.fileCacheMap.get(filePath);
+        if (fileCacheEntry == null) {
+            return null;
+        }
+
+        byte[] pageData = fileCacheEntry.get(startOffsetInFile);
+        if (null == pageData) {
+            return null;
+        }
+
+        ByteBuffer pageDataBuffer = ByteBuffer.wrap(pageData);
+        long offsetInFile = pageDataBuffer.getLong();
+        byte[] realData = new byte[pageDataBuffer.remaining()];
+        pageDataBuffer.get(realData);
+        return new Page(filePath, offsetInFile, realData);
+    }
+
+    @Override
+    public boolean contains(String filePath, long startOffsetInFile) throws IOException {
+        this.checkInitialize();
+        if (!this.fileCacheMap.containsKey(filePath) || this.fileCacheMap.get(filePath) == null
+                || !this.fileCacheMap.get(filePath).contains(startOffsetInFile)) {
+            LOG.debug("File not found in cache for file: {}", filePath);
+            return false;
+        }
+        return true;
+    }
+
+    @Override
+    public void remove(String filePath, long startOffsetInFile) throws IOException {
+        this.checkInitialize();
+        FileCacheEntry fileCacheEntry = this.fileCacheMap.get(filePath);
+        if (fileCacheEntry == null) {
+            LOG.warn("FileCacheEntry for file {} not found, cannot remove page.", filePath);
+            return; // 如果没有找到对应的文件缓存,直接返回
+        }
+        fileCacheEntry.remove(startOffsetInFile);
+    }
+
+    @Override
+    public void remove(String filePath) throws IOException {
+        this.checkInitialize();
+
+        FileCacheEntry fileCacheEntry = this.fileCacheMap.remove(filePath);
+        if (fileCacheEntry == null) {
+            LOG.warn("FileCacheEntry for file {} not found, cannot remove.", filePath);
+            return; // 如果没有找到对应的文件缓存,直接返回
+        }
+
+        try {
+            fileCacheEntry.release();
+        } catch (IOException e) {
+            LOG.error("Error releasing FileCacheEntry for file: {}", filePath, e);
+        }
+    }
+
+    @Override
+    public void clear() throws IOException {
+        this.checkInitialize();
+        Set<String> keys = new HashSet<>(this.fileCacheMap.keySet());
+        for (String key : keys) {
+            try {
+                this.remove(key);
+            } catch (IOException e) {
+                LOG.error("Error clearing cache for file: {}", key, e);
+            }
+        }
+    }
+
+    @Override
+    public void close() {
+        if (!this.isInitialized.get()) {
+            LOG.warn("LocalPageCache is not initialized, nothing to close.");
+            return; // 如果没有初始化,直接返回
+        }
+
+        if (this.referCount.decrementAndGet() > 0) {
+            LOG.info("Decremented referCount, current count: {}", this.referCount.get());
+            return; // 如果引用计数大于0,直接返回
+        }
+
+        LOG.info("Closing LocalPageCache, clearing all caches.");
+        // 清理所有缓存
+        try {
+            this.clear();
+        } catch (IOException e) {
+            LOG.error("Error clearing LocalPageCache during close.", e);
+        } finally {
+            LOG.info("LocalPageCache closed successfully.");
+        }
+        this.isInitialized.set(false);
+    }
+
+    private void checkInitialize() throws IOException {
+        if (!this.isInitialized.get()) {
+            throw new IOException("The LocalPageCache has not been initialized.");
+        }
+    }
+
+    private static Constructor<? extends RangeCache> buildRangeCacheConstructor(String className) throws IOException {
+        try {
+            Class<?> clazz = Class.forName(className);
+            if (RangeCache.class.isAssignableFrom(clazz)) {
+                return clazz.asSubclass(RangeCache.class).getConstructor(int.class);
+            } else {
+                throw new IOException("Class " + className + " is not a subclass of RangeCache.");
+            }
+        } catch (ClassNotFoundException e) {
+            throw new IOException("Failed to find RangeCache implementation: " + className, e);
+        } catch (NoSuchMethodException e) {
+            throw new IOException("No suitable constructor found for RangeCache implementation: " + className, e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/PageCache.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageCache.java
new file mode 100644
index 0000000..f6a21a1
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageCache.java
@@ -0,0 +1,40 @@
+package org.apache.hadoop.fs.cosn.cache;
+
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+public interface PageCache {
+    class Page {
+        private final String filePath;
+        private final long offsetInFile;
+        private final byte[] content;
+
+        public Page(String filePath, long offsetInFile, byte[] content) {
+          this.filePath = filePath;
+          this.offsetInFile = offsetInFile;
+            this.content = content;
+        }
+
+        public String getFilePath() {
+            return filePath;
+        }
+
+        public long getOffsetInFile() {
+            return offsetInFile;
+        }
+
+        public byte[] getContent() {
+            return content;
+        }
+    }
+
+    void initialize(Configuration configuration) throws IOException;
+    void put(Page page) throws IOException;
+    Page get(String filePath, long startOffsetInFile) throws IOException;
+    boolean contains(String filePath, long startOffsetInFile) throws IOException;
+    void remove(String filePath, long startOffsetInFile) throws IOException;
+    void remove(String filePath) throws IOException;
+    void clear() throws IOException;
+    void close();
+}
diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/PageFaultException.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageFaultException.java
new file mode 100644
index 0000000..39deb40
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageFaultException.java
@@ -0,0 +1,19 @@
+package org.apache.hadoop.fs.cosn.cache;
+
+import java.io.IOException;
+
+public class PageFaultException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public PageFaultException(String message) {
+        super(message);
+    }
+
+    public PageFaultException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public PageFaultException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/PageOverlapException.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageOverlapException.java
new file mode 100644
index 0000000..79b5cb9
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageOverlapException.java
@@ -0,0 +1,15 @@
+package org.apache.hadoop.fs.cosn.cache;
+
+public class PageOverlapException extends PageFaultException {
+    public PageOverlapException(String message) {
+        super(message);
+    }
+
+    public PageOverlapException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public PageOverlapException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/RangeCache.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/RangeCache.java
new file mode 100644
index 0000000..d507a38
--- /dev/null
+++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/RangeCache.java
@@ -0,0 +1,131 @@
+package org.apache.hadoop.fs.cosn.cache;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+public abstract class RangeCache<K extends Comparable<K>, V> {
+    public static final class Entry<K> implements Serializable {
+        private static final long serialVersionUID = 5916455377642165613L;
+
+        public enum RangeType {
+            CLOSED_OPEN, OPEN_CLOSED, OPEN_OPEN, CLOSED_CLOSED
+        }
+
+        public static <K extends Comparable<K>> Entry<K> closed(K lowerEndpoint, K upperEndpoint) {
+            return new Entry<>(lowerEndpoint, upperEndpoint, RangeType.CLOSED_CLOSED);
+        }
+
+        public static <K extends Comparable<K>> Entry<K> closedOpen(K lowerEndpoint, K upperEndpoint) {
+            return new Entry<>(lowerEndpoint, upperEndpoint, RangeType.CLOSED_OPEN);
+        }
+
+        public static <K extends Comparable<K>> Entry<K> openClosed(K lowerEndpoint, K upperEndpoint) {
+            return new Entry<>(lowerEndpoint, upperEndpoint, RangeType.OPEN_CLOSED);
+        }
+
+        public static <K extends Comparable<K>> Entry<K> open(K lowerEndpoint, K upperEndpoint) {
+            return new Entry<>(lowerEndpoint, upperEndpoint, RangeType.OPEN_OPEN);
+        }
+
+        private final K lowerEndpoint;
+        private final K upperEndpoint;
+        private final RangeType rangeType;
+
+        private Entry(K lowerEndpoint, K upperEndpoint, RangeType rangeType) {
+            this.lowerEndpoint = lowerEndpoint;
+            this.upperEndpoint = upperEndpoint;
+            this.rangeType = rangeType;
+        }
+
+        public K getLowerEndpoint() {
+            return lowerEndpoint;
+        }
+
+        public K getUpperEndpoint() {
+            return upperEndpoint;
+        }
+
+        public RangeType getRangeType() {
+            return rangeType;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof Entry)) return false;
+            Entry<?> entry = (Entry<?>) o;
+            return Objects.equals(lowerEndpoint, entry.lowerEndpoint) && Objects.equals(upperEndpoint, entry.upperEndpoint) && rangeType == entry.rangeType;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(lowerEndpoint, upperEndpoint, rangeType);
+        }
+    }
+
+    protected final int capacity;
+    protected RemovalListener<Entry<K>, V> removalListener = null;
+
+    public static class RemovalNotification<RANGE_KEY, V> implements Map.Entry<RANGE_KEY, V>, Serializable {
+        private static final long serialVersionUID = 2457865473217506034L;
+
+        private final RANGE_KEY rangeKey;
+        private final V value;
+
+        RemovalNotification(RANGE_KEY rangeKey, V value) {
+            this.rangeKey = rangeKey;
+            this.value = value;
+        }
+
+        @Override
+        public RANGE_KEY getKey() {
+            return this.rangeKey;
+        }
+
+        @Override
+        public V getValue() {
+            return this.value;
+        }
+
+        @Override
+        public V setValue(V value) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+    public interface RemovalListener<RANGE_KEY, V> {
+        void onRemoval(RemovalNotification<RANGE_KEY, V> notification);
+    }
+
+    protected RangeCache(int capacity) {
+        this.capacity = capacity;
+    }
+
+    public void setRemovalListener(RemovalListener<Entry<K>, V> removalListener) {
+        this.removalListener = removalListener;
+    }
+
+    public abstract void put(Entry<K> entry, V value);
+
+    public abstract V get(K key);
+
+    public abstract Entry<K> getEntry(K key);
+
+    public abstract boolean contains(K key);
+
+    public abstract void remove(K entry );
+
+    public abstract int size();
+
+    public abstract boolean isEmpty();
+
+    public abstract void clear();
+
+    public abstract boolean containsOverlaps(Entry<K> entry);
+
+    protected void notifyRemoval(RemovalNotification<Entry<K>, V> notification) {
+        if (removalListener != null) {
+            removalListener.onRemoval(notification);
+        }
+    }
+}