From 670875468a37c3672ffe1796257dfd6251f76720 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Mon, 9 Jun 2025 16:55:44 +0800 Subject: [PATCH] feature: add evicted buffer cache to improve read performance. Signed-off-by: Yang Yu --- pom.xml | 3 +- .../org/apache/hadoop/fs/CosNConfigKeys.java | 14 +- .../apache/hadoop/fs/CosNFSInputStream.java | 68 +++- .../org/apache/hadoop/fs/CosNFileSystem.java | 117 +++--- .../java/org/apache/hadoop/fs/CosNUtils.java | 63 +++ .../org/apache/hadoop/fs/cosn/Constants.java | 3 + .../hadoop/fs/cosn/buffer/CosNByteBuffer.java | 17 + .../cosn/buffer/CosNMappedBufferFactory.java | 40 +- .../hadoop/fs/cosn/cache/LRURangeCache.java | 158 ++++++++ .../hadoop/fs/cosn/cache/LocalPageCache.java | 377 ++++++++++++++++++ .../hadoop/fs/cosn/cache/PageCache.java | 40 ++ .../fs/cosn/cache/PageFaultException.java | 19 + .../fs/cosn/cache/PageOverlapException.java | 15 + .../hadoop/fs/cosn/cache/RangeCache.java | 131 ++++++ 14 files changed, 970 insertions(+), 95 deletions(-) create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/cache/LRURangeCache.java create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/cache/LocalPageCache.java create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/cache/PageCache.java create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/cache/PageFaultException.java create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/cache/PageOverlapException.java create mode 100644 src/main/java/org/apache/hadoop/fs/cosn/cache/RangeCache.java diff --git a/pom.xml b/pom.xml index d81a9011..2e1b6930 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.qcloud.cos hadoop-cos - 8.3.22 + 8.3.23 jar Apache Hadoop Tencent Cloud COS Support @@ -201,6 +201,7 @@ 8 **/CosNFileSystemExt.java + **/cosn/cache/LRURangeCache.java diff --git a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java index 79a50d48..5f7fd562 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java +++ b/src/main/java/org/apache/hadoop/fs/CosNConfigKeys.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.file.Paths; import java.util.Properties; /** @@ -162,7 +163,7 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final long DEFAULT_THREAD_KEEP_ALIVE_TIME = 60L; public static final String READ_AHEAD_BLOCK_SIZE_KEY = "fs.cosn.read.ahead.block.size"; - public static final long DEFAULT_READ_AHEAD_BLOCK_SIZE = 1 * Unit.MB; + public static final long DEFAULT_READ_AHEAD_BLOCK_SIZE = Unit.MB; public static final String READ_AHEAD_QUEUE_SIZE = "fs.cosn.read.ahead.queue.size"; public static final int DEFAULT_READ_AHEAD_QUEUE_SIZE = 6; // used to control getFileStatus list to judge dir whether exist. @@ -284,6 +285,17 @@ public class CosNConfigKeys extends CommonConfigurationKeys { public static final String COSN_READ_BUFFER_POOL_CAPACITY = "fs.cosn.read.buffer.pool.capacity"; public static final long DEFAULT_READ_BUFFER_POOL_CAPACITY = -1; + // An experimental feature: Page Cache. By default, it is disabled, and the max size is 1GB. + public static final String COSN_READ_PAGE_CACHE_ENABLED = "fs.cosn.read.page.cache.enabled"; + public static final String COSN_READ_PAGE_CACHE_DIR = "fs.cosn.read.page.cache.dir"; + public static final String DEFAULT_READ_PAGE_CACHE_DIR = Paths.get(DEFAULT_TMP_DIR, "page_cache").toString(); + public static final String COSN_READ_PAGE_CACHE_RANGE_CACHE_IMPL = "fs.cosn.read.page.cache.range.cache.impl"; + public static final String DEFAULT_READ_PAGE_CACHE_RANGE_CACHE_IMPL = "org.apache.hadoop.fs.cosn.cache.LRURangeCache"; + public static final String COSN_READ_PAGE_CACHE_FILE_NUM = "fs.cosn.read.page.cache.file.num"; + public static final int DEFAULT_READ_PAGE_CACHE_FILE_NUM = 10; + public static final String COSN_READ_FRAGMENT_CACHE_EACH_FILE_FRAGMENT_NUM = "fs.cosn.read.page.cache.number"; + public static final int DEFAULT_READ_FRAGMENT_CACHE_EACH_FILE_FRAGMENT_NUM = 200; + public static final String COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS = "fs.cosn.read.buffer.allocate.timeout.seconds"; public static final long DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS = 5; diff --git a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java index 091b79a9..d198268c 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFSInputStream.java @@ -4,6 +4,7 @@ import org.apache.hadoop.fs.cosn.MemoryAllocator; import org.apache.hadoop.fs.cosn.CosNOutOfMemoryException; import org.apache.hadoop.fs.cosn.ReadBufferHolder; +import org.apache.hadoop.fs.cosn.cache.PageCache; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +23,8 @@ import javax.annotation.Nullable; +import static org.apache.hadoop.fs.CosNConfigKeys.DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS; + public class CosNFSInputStream extends FSInputStream { public static final Logger LOG = @@ -115,6 +118,13 @@ public void free() { } } + private enum ReopenLocation { + PRE_READ_QUEUE, + PREVIOUS_BUFFER, + LOCAL_CACHE, + NONE + } + private FileSystem.Statistics statistics; private final Configuration conf; private final NativeFileSystemStore store; @@ -137,6 +147,10 @@ public void free() { // 设置一个 Previous buffer 用于暂存淘汰出来的队头元素,用以优化小范围随机读的性能 private ReadBuffer previousReadBuffer; + private final PageCache pageCache; + + private ReopenLocation reopenLocation = ReopenLocation.NONE; + /** * Input Stream * @@ -153,7 +167,7 @@ public CosNFSInputStream( FileSystem.Statistics statistics, String key, FileStatus fileStatus, - ExecutorService readAheadExecutorService) { + ExecutorService readAheadExecutorService, PageCache pageCache) { super(); this.conf = conf; this.store = store; @@ -177,6 +191,7 @@ public CosNFSInputStream( this.readAheadExecutorService = readAheadExecutorService; this.readBufferQueue = new ArrayDeque<>(this.maxReadPartNumber); + this.pageCache = pageCache; this.closed = new AtomicBoolean(false); } @@ -185,6 +200,13 @@ private void tryFreeBuffer(ReadBuffer readBuffer) { && readBuffer != previousReadBuffer && readBuffer != currentReadBuffer && (readBufferQueue.isEmpty() || readBufferQueue.peek() != readBuffer)) { + if (null != this.pageCache && readBuffer.getBuffer() != null) { + try { + this.pageCache.put(new PageCache.Page(this.fileStatus.getPath().toString(), readBuffer.getStart(), readBuffer.getBuffer())); + } catch (IOException e) { + LOG.warn("Failed to add page to the page cache", e); + } + } readBuffer.free(); } } @@ -219,7 +241,10 @@ private synchronized void reopen(long pos) throws IOException { // 发生了随机读,针对于小范围的回溯随机读,则直接看一下是否命中了前一次刚刚被淘汰出去的队头读缓存 // 如果不是,那么随机读只可能是发生了超出前一块范围的回溯随机读,或者是在预读队列范围或者是超出预读队列范围。 // 如果是在预读队列范围内,那么依赖在预读队列中查找直接定位到要读的块,如果是超出预读队列范围,那么队列会被排空,然后重新定位到要读的块和位置 - if (null != this.previousReadBuffer && pos >= this.previousReadBuffer.getStart() && pos <= this.previousReadBuffer.getEnd()) { + if (this.reopenLocation == ReopenLocation.PREVIOUS_BUFFER + && null != this.previousReadBuffer + && pos >= this.previousReadBuffer.getStart() + && pos <= this.previousReadBuffer.getEnd()) { setCurrentReadBuffer(previousReadBuffer); this.bufferStart = this.previousReadBuffer.getStart(); this.bufferEnd = this.previousReadBuffer.getEnd(); @@ -228,6 +253,31 @@ private synchronized void reopen(long pos) throws IOException { this.nextPos = !this.readBufferQueue.isEmpty() ? this.readBufferQueue.getFirst().getStart() : pos + this.preReadPartSize; return; } + + // 查一下是否在 local cache 中 + if (this.reopenLocation == ReopenLocation.LOCAL_CACHE + && null != this.pageCache) { + PageCache.Page page = this.pageCache.get(this.fileStatus.getPath().toString(), pos); + if (page != null) { + ReadBuffer readBuffer = new ReadBuffer(page.getOffsetInFile(), page.getOffsetInFile() + page.getContent().length - 1); + try { + readBuffer.allocate(conf.getLong(CosNConfigKeys.COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS, + DEFAULT_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS), TimeUnit.SECONDS); + System.arraycopy(page.getContent(), 0, readBuffer.getBuffer(), 0, page.getContent().length); + readBuffer.setStatus(ReadBuffer.SUCCESS); + setCurrentReadBuffer(readBuffer); + this.bufferStart = readBuffer.getStart(); + this.bufferEnd = readBuffer.getEnd(); + this.position = pos; + this.partRemaining = (this.bufferEnd - this.bufferStart + 1) - (pos - this.bufferStart); + this.nextPos = !this.readBufferQueue.isEmpty() ? this.readBufferQueue.getFirst().getStart() : pos + this.preReadPartSize; + return; + } catch (Exception e) { + LOG.error("allocate read buffer failed.", e); + // continue to reopen + } + } + } } // 在预读队列里面定位到要读的块 while (!this.readBufferQueue.isEmpty()) { @@ -338,14 +388,22 @@ public void seek(long pos) throws IOException { // 在上一次刚刚被淘汰的预读块中 this.position = pos; this.partRemaining = -1; // 触发 reopen + this.reopenLocation = ReopenLocation.PREVIOUS_BUFFER; } else if (!this.readBufferQueue.isEmpty() && pos >= this.readBufferQueue.getFirst().getStart() && pos <= this.readBufferQueue.getLast().getEnd()) { // 在预读队列中 this.position = pos; this.partRemaining = -1; // 触发 reopen + this.reopenLocation = ReopenLocation.PRE_READ_QUEUE; + } else if (null != this.pageCache && this.pageCache.contains(this.fileStatus.getPath().toString(), pos)) { + // 命中分片缓存 + this.position = pos; + this.partRemaining = -1; // 触发 reopen + this.reopenLocation = ReopenLocation.LOCAL_CACHE; } else { - // 既不在预读队列中,也不在上一次刚刚被淘汰的预读块中,那么直接定位到要读的块和位置 + // 既不在预读队列中,也不在上一次刚刚被淘汰的预读块和本地缓存中,那么直接定位到要读的块和位置 this.position = pos; this.partRemaining = -1; + this.reopenLocation = ReopenLocation.NONE; } } @@ -441,6 +499,7 @@ public int available() throws IOException { return (int) remaining; } + @Override public void close() throws IOException { if (this.closed.get()) { @@ -453,6 +512,9 @@ public void close() throws IOException { } setCurrentReadBuffer(null); setPreviousReadBuffer(null); + if (null != this.pageCache) { + this.pageCache.remove(this.fileStatus.getPath().toString()); + } } private void checkOpened() throws IOException { diff --git a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java index 76a7b3ea..b6c2f918 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java +++ b/src/main/java/org/apache/hadoop/fs/CosNFileSystem.java @@ -15,6 +15,8 @@ import org.apache.hadoop.fs.cosn.OperationCancellingStatusProvider; import org.apache.hadoop.fs.cosn.ReadBufferHolder; import org.apache.hadoop.fs.cosn.Unit; +import org.apache.hadoop.fs.cosn.cache.LocalPageCache; +import org.apache.hadoop.fs.cosn.cache.PageCache; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; @@ -76,6 +78,8 @@ public class CosNFileSystem extends FileSystem { private RangerCredentialsClient rangerCredentialsClient; + private PageCache localPageCache = null; + static final String CSE_ALGORITHM_USER_METADATA = "client-side-encryption-cek-alg"; private int symbolicLinkSizeThreshold; @@ -149,11 +153,16 @@ public void initialize(URI uri, Configuration conf) throws IOException { BufferPool.getInstance().initialize(this.getConf()); if (this.getConf().getBoolean(CosNConfigKeys.COSN_POSIX_EXTENSION_ENABLED, - CosNConfigKeys.DEFAULT_COSN_POSIX_EXTENSION_ENABLED)) { + CosNConfigKeys.DEFAULT_COSN_POSIX_EXTENSION_ENABLED)) { // 只有在开启 POSIX 扩展语义支持的时候才会初始化 LocalRandomAccessMappedBufferPool.getInstance().initialize(this.getConf()); } + if (this.getConf().getBoolean(CosNConfigKeys.COSN_READ_PAGE_CACHE_ENABLED, false)) { + this.localPageCache = LocalPageCache.getInstance(); + this.localPageCache.initialize(this.getConf()); + } + // initialize the thread pool int uploadThreadPoolSize = this.getConf().getInt( CosNConfigKeys.UPLOAD_THREAD_POOL_SIZE_KEY, @@ -193,7 +202,7 @@ public void initialize(URI uri, Configuration conf) throws IOException { new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, - ThreadPoolExecutor executor) { + ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); @@ -223,7 +232,7 @@ public void rejectedExecution(Runnable r, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, - ThreadPoolExecutor executor) { + ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); @@ -259,7 +268,7 @@ public void rejectedExecution(Runnable r, new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, - ThreadPoolExecutor executor) { + ThreadPoolExecutor executor) { if (!executor.isShutdown()) { try { executor.getQueue().put(r); @@ -274,15 +283,15 @@ public void rejectedExecution(Runnable r, this.symbolicLinkSizeThreshold = this.getConf().getInt( CosNConfigKeys.COSN_SYMBOLIC_SIZE_THRESHOLD, CosNConfigKeys.DEFAULT_COSN_SYMBOLIC_SIZE_THRESHOLD); - this.directoryFirstEnabled = this.getConf().getBoolean(CosNConfigKeys.COSN_FILESTATUS_DIR_FIRST_ENABLED, - CosNConfigKeys.DEFAULT_FILESTATUS_DIR_FIRST_ENABLED); + this.directoryFirstEnabled = this.getConf().getBoolean(CosNConfigKeys.COSN_FILESTATUS_DIR_FIRST_ENABLED, + CosNConfigKeys.DEFAULT_FILESTATUS_DIR_FIRST_ENABLED); ReadBufferHolder.initialize(this.getConf().getLong(CosNConfigKeys.COSN_READ_BUFFER_POOL_CAPACITY, - CosNConfigKeys.DEFAULT_READ_BUFFER_POOL_CAPACITY)); + CosNConfigKeys.DEFAULT_READ_BUFFER_POOL_CAPACITY)); if (this.getConf().getInt(CosNConfigKeys.COSN_READ_BUFFER_ALLOCATE_TIMEOUT_SECONDS, 5) < 0) { throw new IllegalArgumentException("fs.cosn.read.buffer.allocate.timeout.seconds cannot be negative."); } this.createOpCheckExistFile = this.getConf().getBoolean(CosNConfigKeys.COSN_CREATE_FILE_EXIST_OP_ENABLED, - CosNConfigKeys.DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE); + CosNConfigKeys.DEFAULT_COSN_COMPLETE_MPU_CHECK_ENABLE); } @Override @@ -302,7 +311,7 @@ public Path getHomeDirectory() { @Override public FSDataOutputStream append(Path f, int bufferSize, - Progressable progress) throws IOException { + Progressable progress) throws IOException { if (this.isPosixBucket) { throw new UnsupportedOperationException( "The posix bucket does not support append operation through S3 gateway"); @@ -336,56 +345,56 @@ public FSDataOutputStream append(Path f, int bufferSize, Path absolutePath = makeAbsolute(f); String cosKey = pathToKey(absolutePath); if (this.getConf().getBoolean(CosNConfigKeys.COSN_POSIX_EXTENSION_ENABLED, - CosNConfigKeys.DEFAULT_COSN_POSIX_EXTENSION_ENABLED)) { + CosNConfigKeys.DEFAULT_COSN_POSIX_EXTENSION_ENABLED)) { // 这里使用类的动态加载和创建机制是为了在默认场景下(即不支持随机写的场景),可以不依赖 ofs-sdk-definition 这个 jar 包。 Class seekableOutputStreamClass; Object seekableOutputStream; try { seekableOutputStreamClass = Class.forName("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream"); Constructor constructor = seekableOutputStreamClass.getConstructor(Configuration.class, NativeFileSystemStore.class, - String.class, ExecutorService.class, ExecutorService.class); + String.class, ExecutorService.class, ExecutorService.class); seekableOutputStream = constructor.newInstance(this.getConf(), this.nativeStore, cosKey, this.boundedIOThreadPool, this.boundedCopyThreadPool); } catch (ClassNotFoundException e) { throw new IOException("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream can not be found. " + - "please make sure that ofs-sdk-definition.jar is placed in the classpath.", e); + "please make sure that ofs-sdk-definition.jar is placed in the classpath.", e); } catch (InvocationTargetException e) { throw new IOException(e); } catch (NoSuchMethodException e) { throw new IOException("Failed to find the constructor of the " + - "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e); + "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e); } catch (InstantiationException e) { throw new IOException("Failed to create the object of the " + - "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e); + "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e); } catch (IllegalAccessException e) { throw new IOException("Failed to access the constructor of the " + - "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e); + "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream", e); } try { Class seekableFSDataOutputStreamClass = Class.forName("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream"); Constructor constructor = seekableFSDataOutputStreamClass.getConstructor( - Class.forName("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream"), - FileSystem.Statistics.class); + Class.forName("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream$SeekableOutputStream"), + FileSystem.Statistics.class); return (FSDataOutputStream) constructor.newInstance(seekableOutputStream, statistics); } catch (ClassNotFoundException e) { throw new IOException("org.apache.hadoop.fs.CosNSeekableFSDataOutputStream can not be found. " + - "please make sure that ofs-sdk-definition.jar is placed in the classpath.", e); + "please make sure that ofs-sdk-definition.jar is placed in the classpath.", e); } catch (NoSuchMethodException e) { throw new IOException("Failed to find the constructor of the " + - "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e); + "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e); } catch (InvocationTargetException e) { throw new IOException(e); } catch (InstantiationException e) { throw new IOException("Failed to create the object of the " + - "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e); + "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e); } catch (IllegalAccessException e) { throw new IOException("Failed to access the constructor of the " + - "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e); + "org.apache.hadoop.fs.CosNSeekableFSDataOutputStream", e); } } else { return new FSDataOutputStream(new CosNExtendedFSDataOutputStream( - this.getConf(), this.nativeStore, cosKey, this.boundedCopyThreadPool, this.boundedCopyThreadPool,true), - statistics, fileStatus.getLen()); + this.getConf(), this.nativeStore, cosKey, this.boundedCopyThreadPool, this.boundedCopyThreadPool, true), + statistics, fileStatus.getLen()); } } @@ -394,7 +403,7 @@ public boolean truncate(Path f, long newLength) throws IOException { // 默认可以支持 truncate if (this.isPosixBucket) { throw new UnsupportedOperationException( - "The posix bucket does not support the truncate operation through S3 gateway."); + "The posix bucket does not support the truncate operation through S3 gateway."); } // 如果配置中开启客户端加密,则不支持 @@ -476,19 +485,19 @@ public FSDataOutputStream create(Path f, FsPermission permission, throw new FileAlreadyExistsException("Directory already exists: " + f); } } catch (FileNotFoundException ignore) { - // NOTE: 这里之前认为可能会出现从 COS 的 SDK 或者 API 上传了一个 / 结尾的有内容对象 - // 那么再在这个文件前缀下面成功创建新的对象而不报错的话,其实是不符合文件系统语义规范的。 - // 同时,也是为了保证一个完整的目录结构,但是确实会带来元数据查询请求的明显放大。 - // 不过这里,因为一般不会出现 / 结尾的内容对象,即使出现也不会覆盖丢失(因为这里相当于它的一个commonPrefix,原始对象还在COS里面) - // 所以决定去掉这个检查,来改善优化性能。 - // validatePath(f) + // NOTE: 这里之前认为可能会出现从 COS 的 SDK 或者 API 上传了一个 / 结尾的有内容对象 + // 那么再在这个文件前缀下面成功创建新的对象而不报错的话,其实是不符合文件系统语义规范的。 + // 同时,也是为了保证一个完整的目录结构,但是确实会带来元数据查询请求的明显放大。 + // 不过这里,因为一般不会出现 / 结尾的内容对象,即使出现也不会覆盖丢失(因为这里相当于它的一个commonPrefix,原始对象还在COS里面) + // 所以决定去掉这个检查,来改善优化性能。 + // validatePath(f) } } Path absolutePath = makeAbsolute(f); String key = pathToKey(absolutePath); if (this.getConf().getBoolean(CosNConfigKeys.COSN_FLUSH_ENABLED, - CosNConfigKeys.DEFAULT_COSN_FLUSH_ENABLED)) { + CosNConfigKeys.DEFAULT_COSN_FLUSH_ENABLED)) { // Need to support the synchronous flush. return new FSDataOutputStream( new CosNExtendedFSDataOutputStream(this.getConf(), nativeStore, key, @@ -591,7 +600,7 @@ private void internalRecursiveDelete(String key, int listMaxLength) throws IOExc priorLastKey = listing.getPriorLastKey(); if (this.operationCancellingStatusProviderThreadLocal.get() != null - && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) { + && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) { LOG.warn("The delete operation is cancelled. key: {}.", key); throw new IOException("The delete operation is cancelled. key: " + key); } @@ -712,7 +721,7 @@ private FileStatus getFileStatus(Path f, Set probes) throws * 否则返回文件。 */ private FileStatus getFileStatusHelper(FileMetadata meta, Path absolutePath, String key, - String headRequestId) throws IOException { + String headRequestId) throws IOException { if (directoryFirstEnabled && meta.getLength() == 0) { if (!key.endsWith(PATH_DELIMITER)) { key += PATH_DELIMITER; @@ -732,6 +741,7 @@ private FileStatus getFileStatusHelper(FileMetadata meta, Path absolutePath, Str /** * 使用list接口查询目录 + * * @return null 如果目录不存在 */ private FileStatus getFileStatusByList(String key, Path absolutePath, String headRequestId) throws IOException { @@ -833,7 +843,7 @@ public FileStatus[] listStatus(Path f) throws IOException { priorLastKey = listing.getPriorLastKey(); if (this.operationCancellingStatusProviderThreadLocal.get() != null - && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) { + && this.operationCancellingStatusProviderThreadLocal.get().isCancelled()) { LOG.warn("The list operation is cancelled. key: {}.", key); throw new IOException("The list operation is cancelled. key: " + key); } @@ -1011,7 +1021,7 @@ public FSDataInputStream open(Path f, int bufferSize, FileStatus fileStatus) thr String key = pathToKey(absolutePath); return new FSDataInputStream(new CosNFSBufferedFSInputStream( new CosNFSInputStream(this.getConf(), nativeStore, statistics, key, - fileStatus, this.boundedIOThreadPool), + fileStatus, this.boundedIOThreadPool, this.localPageCache), bufferSize)); } @@ -1461,22 +1471,22 @@ public void createSymlink(Path target, Path link, boolean createParent) } try { - FileStatus parentStatus = this.getFileStatus(link.getParent()); - if (!parentStatus.isDirectory()) { - throw new ParentNotDirectoryException( - String.format("The parent path of the symlink [%s] is not a directory.", link)); - } + FileStatus parentStatus = this.getFileStatus(link.getParent()); + if (!parentStatus.isDirectory()) { + throw new ParentNotDirectoryException( + String.format("The parent path of the symlink [%s] is not a directory.", link)); + } } catch (FileNotFoundException parentDirNotFoundException) { - if (createParent) { - LOG.debug("The parent directory of the symlink [{}] does not exist, " + - "creating it.", link.getParent()); - if (!this.mkdirs(link.getParent())) { - throw new IOException(String.format( - "Failed to create the parent directory of the symlink [%s].", link)); + if (createParent) { + LOG.debug("The parent directory of the symlink [{}] does not exist, " + + "creating it.", link.getParent()); + if (!this.mkdirs(link.getParent())) { + throw new IOException(String.format( + "Failed to create the parent directory of the symlink [%s].", link)); + } + } else { + throw parentDirNotFoundException; } - } else { - throw parentDirNotFoundException; - } } try { @@ -1529,9 +1539,9 @@ public FileStatus getFileLinkStatus(final Path f) @Override public boolean supportsSymlinks() { - return this.getConf().getBoolean( - CosNConfigKeys.COSN_SUPPORT_SYMLINK_ENABLED, - CosNConfigKeys.DEFAULT_COSN_SUPPORT_SYMLINK_ENABLED); + return this.getConf().getBoolean( + CosNConfigKeys.COSN_SUPPORT_SYMLINK_ENABLED, + CosNConfigKeys.DEFAULT_COSN_SUPPORT_SYMLINK_ENABLED); } /** @@ -1599,6 +1609,9 @@ public void close() throws IOException { LOG.error("boundedIOThreadPool shutdown interrupted.", e); } BufferPool.getInstance().close(); + if (null != this.localPageCache) { + this.localPageCache.close(); + } super.close(); } finally { // copy 和 delete 因为涉及到元数据操作,因此最后再释放 diff --git a/src/main/java/org/apache/hadoop/fs/CosNUtils.java b/src/main/java/org/apache/hadoop/fs/CosNUtils.java index 825de03e..ae3555e2 100644 --- a/src/main/java/org/apache/hadoop/fs/CosNUtils.java +++ b/src/main/java/org/apache/hadoop/fs/CosNUtils.java @@ -331,6 +331,46 @@ public static Path keyToPath(String key, String pathDelimiter) { } } + /** + * Create a temporary directory for buffer files. + * + * @param tmpDir + * @return + * @throws IOException + */ + public static File createTempDir(String tmpDir) throws IOException { + File file = new File(tmpDir); + if (!file.exists()) { + LOG.debug("Temp directory: [{}] does not exist. Create it first.", + file); + if (file.mkdirs()) { + if (!file.setWritable(true, false) || !file.setReadable(true, false) + || !file.setExecutable(true, false)) { + LOG.warn("Set the buffer dir: [{}]'s permission [writable," + + "readable, executable] failed.", file); + } + LOG.debug("Temp directory: [{}] is created successfully.", + file.getAbsolutePath()); + } else { + // If the directory cannot be created, throw an IOException. + // Prevent problems created by multiple processes at the same + // time. + if (!file.exists()) { + throw new IOException("Temp directory:" + file.getAbsolutePath() + " is created unsuccessfully"); + } + } + } else { + LOG.debug("Temp directory: {} already exists.", file.getAbsolutePath()); + // Check if the tmp dir has read and write permissions. + if (!CosNUtils.checkDirectoryRWPermissions(tmpDir)) { + String exceptionMsg = String.format("The tmp dir does not have read or write permissions." + "dir: %s", tmpDir); + throw new IllegalArgumentException(exceptionMsg); + } + } + + return file; + } + public static boolean checkDirectoryRWPermissions(String directoryPath) { java.nio.file.Path path = Paths.get(directoryPath); @@ -347,4 +387,27 @@ public static boolean checkDirectoryRWPermissions(String directoryPath) { return isReadable && isWritable; } + + /** + * Clear the temporary directory. + * This method will delete all files and directories under the specified temporary directory. + * + * @param tmpDir + */ + public static void clearTempDir(String tmpDir) { + File file = new File(tmpDir); + if (file.exists() && file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null) { + for (File f : files) { + if (f.isFile()) { + f.delete(); + } else if (f.isDirectory()) { + clearTempDir(f.getAbsolutePath()); + f.delete(); + } + } + } + } + } } diff --git a/src/main/java/org/apache/hadoop/fs/cosn/Constants.java b/src/main/java/org/apache/hadoop/fs/cosn/Constants.java index 87b74f86..dce4b56b 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/Constants.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/Constants.java @@ -10,6 +10,9 @@ private Constants() { // Suffix for local cache file name public static final String BLOCK_TMP_FILE_SUFFIX = "_local_block_cache"; + public static final String PAGE_TEMP_FILE_PREFIX = "cos_"; + public static final String PAGE_TEMP_FILE_SUFFIX = "_local_page_cache"; + // Crc32c server response header key public static final String CRC32C_RESP_HEADER = "x-cos-hash-crc32c"; // Crc32c agent request header key diff --git a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java index 4077861f..673b5de2 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNByteBuffer.java @@ -44,6 +44,15 @@ public CosNByteBuffer put(byte[] src, int offset, int length) throws IOException return this; } + public CosNByteBuffer putInt(int value) throws IOException { + if (this.byteBuffer.remaining() < Integer.BYTES) { + throw new IOException("There is no remaining in the buffer for an int."); + } + this.byteBuffer.putInt(value); + this.nextWritePosition = this.byteBuffer.position(); + return this; + } + public byte get() { return this.byteBuffer.get(); } @@ -53,6 +62,14 @@ public CosNByteBuffer get(byte[] dst, int offset, int length) { return this; } + public int getInt() { + return this.byteBuffer.getInt(); + } + + public int getInt(int index) { + return this.byteBuffer.getInt(index); + } + public int capacity() { return this.byteBuffer.capacity(); } diff --git a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java index 737e67c1..594691cf 100644 --- a/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java +++ b/src/main/java/org/apache/hadoop/fs/cosn/buffer/CosNMappedBufferFactory.java @@ -24,48 +24,12 @@ public class CosNMappedBufferFactory implements CosNBufferFactory { public CosNMappedBufferFactory(String[] tmpDirList, boolean deleteOnExit) throws IOException { for (String tmpDir : tmpDirList) { - File createDir = CosNMappedBufferFactory.createDir(tmpDir); + File createDir = CosNUtils.createTempDir(tmpDir); tmpDirs.add(createDir); } this.deleteOnExit = deleteOnExit; } - private static File createDir(String tmpDir) throws IOException { - File file = new File(tmpDir); - if (!file.exists()) { - LOG.debug("Buffer dir: [{}] does not exist. Create it first.", - file); - if (file.mkdirs()) { - if (!file.setWritable(true, false) || !file.setReadable(true, false) - || !file.setExecutable(true, false)) { - LOG.warn("Set the buffer dir: [{}]'s permission [writable," - + "readable, executable] failed.", file); - } - LOG.debug("Buffer dir: [{}] is created successfully.", - file.getAbsolutePath()); - } else { - // Once again, check if it has been created successfully. - // Prevent problems created by multiple processes at the same - // time. - if (!file.exists()) { - throw new IOException("buffer dir:" + file.getAbsolutePath() - + " is created unsuccessfully"); - } - } - } else { - LOG.debug("buffer dir: {} already exists.", - file.getAbsolutePath()); - // Check whether you have read and write permissions for the directory during initialization - if (!CosNUtils.checkDirectoryRWPermissions(tmpDir)) { - String exceptionMsg = String.format("The tmp dir does not have read or write permissions." + - "dir: %s", tmpDir); - throw new IllegalArgumentException(exceptionMsg); - } - } - - return file; - } - @Override public CosNByteBuffer create(int size) { return this.create(Constants.BLOCK_TMP_FILE_PREFIX, @@ -89,7 +53,7 @@ public CosNByteBuffer create(String prefix, String suffix, int size) { LOG.warn("The tmp dir does not exist."); // try to create the tmp directory. try { - CosNMappedBufferFactory.createDir(tmpDir.getAbsolutePath()); + CosNUtils.createTempDir(tmpDir.getAbsolutePath()); } catch (IOException e) { LOG.error("Try to create the tmp dir [{}] failed.", tmpDir.getAbsolutePath(), e); return null; diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/LRURangeCache.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/LRURangeCache.java new file mode 100644 index 00000000..48bb2b3c --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/LRURangeCache.java @@ -0,0 +1,158 @@ +package org.apache.hadoop.fs.cosn.cache; + +import com.google.common.collect.BoundType; +import com.google.common.collect.Range; +import com.google.common.collect.RangeMap; +import com.google.common.collect.TreeRangeMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.LinkedHashMap; +import java.util.Map; + +public class LRURangeCache, V> extends RangeCache { + private static final Logger LOG = LoggerFactory.getLogger(LRURangeCache.class); + + private final int capacity; + private final RangeMap rangeMap; + private final LinkedHashMap, Boolean> lruMap; + + public LRURangeCache(int capacity) { + super(0); + this.capacity = capacity; + this.rangeMap = TreeRangeMap.create(); + this.lruMap = new LinkedHashMap, Boolean>(capacity, 0.75f, true) { + @Override + protected boolean removeEldestEntry(Map.Entry, Boolean> eldest) { + if (size() > LRURangeCache.this.capacity) { + V removedValue = rangeMap.get(eldest.getKey().lowerEndpoint()); + rangeMap.remove(eldest.getKey()); + if (removalListener != null) { + LOG.debug("Removing key {} from LRURangeCache.", eldest.getKey()); + RemovalNotification, V> notification = new RemovalNotification<>( + convertRangeToEntry(eldest.getKey()), removedValue); + removalListener.onRemoval(notification); + } + return true; + } + return false; + } + }; + } + + // 添加范围和对应的值 + @Override + public void put(RangeCache.Entry entry, V value) { + rangeMap.put(Range.closedOpen(entry.getLowerEndpoint(), entry.getUpperEndpoint()), value); + lruMap.put(Range.closedOpen(entry.getLowerEndpoint(), entry.getUpperEndpoint()), Boolean.TRUE); + } + + + // 获取范围内的值 + @Override + public V get(K key) { + V value = rangeMap.get(key); + if (value != null) { + // 更新 LRU 顺序 + Range entry = rangeMap.asMapOfRanges().keySet().stream().filter(r -> r.contains(key)).findFirst().orElse(null); + if (entry != null) { + lruMap.get(entry); + } + } + return value; + } + + @Override + public Entry getEntry(K key) { + return convertRangeToEntry(rangeMap.getEntry(key).getKey()); + } + + @Override + public boolean contains(K key) { + return rangeMap.get(key) != null; + } + + @Override + public void remove(K key) { + Map.Entry, V> entry = rangeMap.getEntry(key); + if (entry != null) { + Range range = entry.getKey(); + V value = entry.getValue(); + rangeMap.remove(range); + lruMap.remove(range); + if (removalListener != null) { + RemovalNotification notification = new RemovalNotification<>( + convertRangeToEntry(range), value); + removalListener.onRemoval(notification); + } + } + } + + // 获取缓存大小 + public int size() { + return lruMap.size(); + } + + @Override + public boolean isEmpty() { + return lruMap.isEmpty(); + } + + @Override + public void clear() { + // 逐个清空并等待回调完成 + for (Map.Entry, Boolean> entry : lruMap.entrySet()) { + Range range = entry.getKey(); + V value = rangeMap.get(range.lowerEndpoint()); + if (value != null && removalListener != null) { + LOG.debug("Clearing key {} from LRURangeCache.", range); + RemovalNotification notification = new RemovalNotification<>( + this.convertRangeToEntry(range), value); + removalListener.onRemoval(notification); + } + } + } + + @Override + public boolean containsOverlaps(RangeCache.Entry entry) { + Range range = Range.closedOpen(entry.getLowerEndpoint(), entry.getUpperEndpoint()); + return rangeMap.asMapOfRanges().keySet().stream().anyMatch(r -> r.isConnected(range) && !r.intersection(range).isEmpty()); + } + + private Range convertRangeHelper(RangeCache.Entry entry) { + switch (entry.getRangeType()) { + case CLOSED_OPEN: + return Range.closedOpen(entry.getLowerEndpoint(), entry.getUpperEndpoint()); + case OPEN_OPEN: + return Range.open(entry.getLowerEndpoint(), entry.getUpperEndpoint()); + case OPEN_CLOSED: + return Range.openClosed(entry.getLowerEndpoint(), entry.getUpperEndpoint()); + case CLOSED_CLOSED: + return Range.closed(entry.getLowerEndpoint(), entry.getUpperEndpoint()); + default: + throw new IllegalArgumentException("Unsupported range type: " + entry.getRangeType()); + } + } + + private RangeCache.Entry convertRangeToEntry(Range range) { + if (range.lowerBoundType() == BoundType.OPEN && range.upperBoundType() == BoundType.OPEN) { + return RangeCache.Entry.open(range.lowerEndpoint(), range.upperEndpoint()); + } + + if (range.lowerBoundType() == BoundType.CLOSED && range.upperBoundType() == BoundType.OPEN) { + return RangeCache.Entry.closedOpen(range.lowerEndpoint(), range.upperEndpoint()); + } + + if (range.lowerBoundType() == BoundType.OPEN && range.upperBoundType() == BoundType.CLOSED) { + return RangeCache.Entry.openClosed(range.lowerEndpoint(), range.upperEndpoint()); + } + + if (range.lowerBoundType() == BoundType.CLOSED && range.upperBoundType() == BoundType.CLOSED) { + return RangeCache.Entry.closed(range.lowerEndpoint(), range.upperEndpoint()); + } + + throw new IllegalArgumentException( + String.format("Unsupported range: %s, lowerBoundedType: %s, uppperBoundedType: %s.", + range, range.lowerBoundType(), range.upperBoundType())); + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/LocalPageCache.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/LocalPageCache.java new file mode 100644 index 00000000..2cc9ae8d --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/LocalPageCache.java @@ -0,0 +1,377 @@ +package org.apache.hadoop.fs.cosn.cache; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CosNConfigKeys; +import org.apache.hadoop.fs.CosNUtils; +import org.apache.hadoop.fs.cosn.buffer.CosNBufferFactory; +import org.apache.hadoop.fs.cosn.buffer.CosNByteBuffer; +import org.apache.hadoop.fs.cosn.buffer.CosNMappedBufferFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Function; + +public class LocalPageCache implements PageCache { + private static final Logger LOG = LoggerFactory.getLogger(LocalPageCache.class); + + private static volatile LocalPageCache instance; + + private AtomicInteger referCount = new AtomicInteger(0); + private AtomicBoolean isInitialized = new AtomicBoolean(false); + + private Map fileCacheMap; + private int pageNum; + private int pageSize; + private String cacheDir; + private CosNBufferFactory cosNBufferFactory; + private Constructor rangeCacheConstructor; + + private final class FileCacheEntry { + // 头部用来存放每个页面的长度 + private static final int META_HEAD_SIZE = Integer.BYTES; + // 每个文件缓存项的锁 + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + // + private final RangeCache rangeIndex; + // 0 代表空闲,1 代表占用 + private final byte[] pageIndexes; + private final CosNByteBuffer pagesBuffer; + private final AtomicBoolean isValid; + + public FileCacheEntry() throws IOException { + this.pageIndexes = new byte[pageNum]; + try { + this.rangeIndex = rangeCacheConstructor.newInstance(pageNum); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new IOException("Failed to create RangeCache instance", e); + } + this.rangeIndex.setRemovalListener(notification -> { + this.lock.writeLock().lock(); + try { + this.pageIndexes[notification.getValue()] = 0; + } finally { + this.lock.writeLock().unlock(); + } + }); + pagesBuffer = cosNBufferFactory.create((META_HEAD_SIZE + pageSize) * pageNum); + this.isValid = new AtomicBoolean(true); + } + + public void put(byte[] pageData, long offsetInFile) throws IOException { + if (pageData == null || pageData.length == 0) { + LOG.warn("Page is null or content is null. Skip to put it into cache."); + return; + } + + if (pageData.length > pageSize) { + LOG.warn("Page size is greater than the size of the cache. Skip to put it."); + return; + } + + this.lock.writeLock().lock(); + if (!this.isValid.get()) { + LOG.warn("FileCacheEntry is not valid, cannot put page data."); + return; // 如果 FileCacheEntry 已经失效,直接返回 + } + try { + if (this.rangeIndex.contains(offsetInFile)) { + LOG.debug("Skipping page as it is already in use"); + return; + } + // 找一个空闲槽位 + int slotIndex = -1; + for (int i = 0; i < this.pageIndexes.length; i++) { + if (this.pageIndexes[i] == 0) { + slotIndex = i; + break; + } + } + if (slotIndex == -1) { + LOG.debug("No free slot available in the page cache."); + return; // 没有空闲槽位 + } + // 写入数据 + this.pagesBuffer.position(slotIndex * (META_HEAD_SIZE + pageSize)); + // 写入数据长度到头部 + this.pagesBuffer.putInt(pageData.length); + // 写入实际数据 + this.pagesBuffer.put(pageData, 0, pageData.length); + + // 更新 page 索引和 range 索引 + this.pageIndexes[slotIndex] = 1; + this.rangeIndex.put(RangeCache.Entry.closedOpen(offsetInFile, + offsetInFile + pageData.length), slotIndex); + } finally { + this.lock.writeLock().unlock(); + } + } + + public byte[] get(long offset) throws IOException { + this.lock.readLock().lock(); + if (!this.isValid.get()) { + LOG.warn("FileCacheEntry is not valid, cannot get page data."); + return null; // 如果 FileCacheEntry 已经失效,直接返回 + } + + try { + if (!this.rangeIndex.contains(offset)) { + LOG.debug("Page not found in cache for offset: {}", offset); + return null; + } + + int slotIndex = this.rangeIndex.get(offset); + RangeCache.Entry rangeEntry = this.rangeIndex.getEntry(offset); + int dataSize = this.pagesBuffer.getInt(slotIndex * (META_HEAD_SIZE + pageSize)); + // 头部放 offsetInFile,后面是实际数据 + ByteBuffer data = ByteBuffer.allocate(Long.BYTES + dataSize); + data.clear(); + data.putLong(rangeEntry.getLowerEndpoint()); + this.pagesBuffer.position(slotIndex * (META_HEAD_SIZE + pageSize) + META_HEAD_SIZE); + this.pagesBuffer.get(data.array(), Long.BYTES, dataSize); + return data.array(); + } finally { + this.lock.readLock().unlock(); + } + } + + public void remove(long offset) throws IOException { + this.lock.writeLock().lock(); + if (!this.isValid.get()) { + LOG.warn("FileCacheEntry is not valid, cannot remove page data."); + return; // 如果 FileCacheEntry 已经失效,直接返回 + } + try { + if (!this.rangeIndex.contains(offset)) { + LOG.warn("Page with offset {} not found in cache.", offset); + return; // 如果没有找到对应的页面,直接返回 + } + int slotIndex = this.rangeIndex.get(offset); + this.rangeIndex.remove(offset); // 从索引中移除 + this.pageIndexes[slotIndex] = 0; // 标记为未使用 + } finally { + this.lock.writeLock().unlock(); + } + } + + public void release() throws IOException { + this.lock.writeLock().lock(); + if (!this.isValid.get()) { + LOG.warn("FileCacheEntry is already released, nothing to do."); + return; // 如果 FileCacheEntry 已经失效,直接返回 + } + this.isValid.set(false); + try { + this.rangeIndex.clear(); + // 清理索引 + Arrays.fill(this.pageIndexes, (byte) 0); + this.pagesBuffer.close(); + } finally { + this.lock.writeLock().unlock(); + } + } + + public boolean contains(long offset) { + this.lock.readLock().lock(); + if (!this.isValid.get()) { + LOG.warn("FileCacheEntry is not valid, cannot check contains."); + return false; // 如果 FileCacheEntry 已经失效,直接返回 + } + + try { + return this.rangeIndex.contains(offset); + } finally { + this.lock.readLock().unlock(); + } + } + + } + + private LocalPageCache() { + } + + // 获取单例实例 + public static LocalPageCache getInstance() { + if (null == instance) { + synchronized (LocalPageCache.class) { + if (null == instance) { + LOG.info("Created new LocalPageCache instance."); + instance = new LocalPageCache(); + } + } + } + return instance; + } + + public synchronized void initialize(Configuration conf) throws IOException { + if (isInitialized.get()) { + LOG.info("Cache is already initialized, incrementing referCount. " + + "Current count: {}", referCount.get()); + this.referCount.incrementAndGet(); + return; // 已经初始化,直接返回 + } + int fileNumber = conf.getInt(CosNConfigKeys.COSN_READ_PAGE_CACHE_FILE_NUM, + CosNConfigKeys.DEFAULT_READ_PAGE_CACHE_FILE_NUM); + this.pageNum = conf.getInt(CosNConfigKeys.COSN_READ_FRAGMENT_CACHE_EACH_FILE_FRAGMENT_NUM, + CosNConfigKeys.DEFAULT_READ_FRAGMENT_CACHE_EACH_FILE_FRAGMENT_NUM); + this.fileCacheMap = new ConcurrentHashMap<>(fileNumber); + this.pageSize = conf.getInt(CosNConfigKeys.READ_AHEAD_BLOCK_SIZE_KEY, (int) CosNConfigKeys.DEFAULT_READ_AHEAD_BLOCK_SIZE); + this.cacheDir = conf.get(CosNConfigKeys.COSN_READ_PAGE_CACHE_DIR, CosNConfigKeys.DEFAULT_READ_PAGE_CACHE_DIR); + this.cosNBufferFactory = new CosNMappedBufferFactory(new String[]{this.cacheDir}, true); + this.rangeCacheConstructor = buildRangeCacheConstructor( + conf.get(CosNConfigKeys.COSN_READ_PAGE_CACHE_RANGE_CACHE_IMPL, + CosNConfigKeys.DEFAULT_READ_PAGE_CACHE_RANGE_CACHE_IMPL)); + + // 确保缓存目录存在,如果不存在则创建,如果存在,则清空目录下的数据 + CosNUtils.createTempDir(this.cacheDir); + CosNUtils.clearTempDir(this.cacheDir); + + isInitialized.set(true); + } + + @Override + public void put(Page page) throws IOException { + this.checkInitialize(); + FileCacheEntry fileCacheEntry = this.fileCacheMap.computeIfAbsent(page.getFilePath(), + new Function() { + @Override + public @Nullable FileCacheEntry apply(@Nullable String key) { + try { + return new FileCacheEntry(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + }); + fileCacheEntry.put(page.getContent(), page.getOffsetInFile()); + } + + @Override + public Page get(String filePath, long startOffsetInFile) throws IOException { + this.checkInitialize(); + FileCacheEntry fileCacheEntry = this.fileCacheMap.get(filePath); + if (fileCacheEntry == null) { + return null; + } + + byte[] pageData = fileCacheEntry.get(startOffsetInFile); + if (null == pageData) { + return null; + } + + ByteBuffer pageDataBuffer = ByteBuffer.wrap(pageData); + long offsetInFile = pageDataBuffer.getLong(); + byte[] realData = new byte[pageDataBuffer.remaining()]; + pageDataBuffer.get(realData); + return new Page(filePath, offsetInFile, realData); + } + + @Override + public boolean contains(String filePath, long startOffsetInFile) throws IOException { + this.checkInitialize(); + if (!this.fileCacheMap.containsKey(filePath) || this.fileCacheMap.get(filePath) == null + || !this.fileCacheMap.get(filePath).contains(startOffsetInFile)) { + LOG.debug("File not found in cache for file: {}", filePath); + return false; + } + return true; + } + + @Override + public void remove(String filePath, long startOffsetInFile) throws IOException { + this.checkInitialize(); + FileCacheEntry fileCacheEntry = this.fileCacheMap.get(filePath); + if (fileCacheEntry == null) { + LOG.warn("FileCacheEntry for file {} not found, cannot remove page.", filePath); + return; // 如果没有找到对应的文件缓存,直接返回 + } + fileCacheEntry.remove(startOffsetInFile); + } + + @Override + public void remove(String filePath) throws IOException { + this.checkInitialize(); + + FileCacheEntry fileCacheEntry = this.fileCacheMap.remove(filePath); + if (fileCacheEntry == null) { + LOG.warn("FileCacheEntry for file {} not found, cannot remove.", filePath); + return; // 如果没有找到对应的文件缓存,直接返回 + } + + try { + fileCacheEntry.release(); + } catch (IOException e) { + LOG.error("Error releasing FileCacheEntry for file: {}", filePath, e); + } + } + + @Override + public void clear() throws IOException { + this.checkInitialize(); + Set keys = new HashSet<>(this.fileCacheMap.keySet()); + for (String key : keys) { + try { + this.remove(key); + } catch (IOException e) { + LOG.error("Error clearing cache for file: {}", key, e); + } + } + } + + @Override + public void close() { + if (!this.isInitialized.get()) { + LOG.warn("LocalPageCache is not initialized, nothing to close."); + return; // 如果没有初始化,直接返回 + } + + if (this.referCount.decrementAndGet() > 0) { + LOG.info("Decremented referCount, current count: {}", this.referCount.get()); + return; // 如果引用计数大于0,直接返回 + } + + LOG.info("Closing LocalPageCache, clearing all caches."); + // 清理所有缓存 + try { + this.clear(); + } catch (IOException e) { + LOG.error("Error clearing LocalPageCache during close.", e); + } finally { + LOG.info("LocalPageCache closed successfully."); + } + this.isInitialized.set(false); + } + + private void checkInitialize() throws IOException { + if (!this.isInitialized.get()) { + throw new IOException("The LocalPageCache has not been initialized."); + } + } + + private static Constructor buildRangeCacheConstructor(String className) throws IOException { + try { + Class clazz = Class.forName(className); + if (RangeCache.class.isAssignableFrom(clazz)) { + return clazz.asSubclass(RangeCache.class).getConstructor(int.class); + } else { + throw new IOException("Class " + className + " is not a subclass of RangeCache."); + } + } catch (ClassNotFoundException e) { + throw new IOException("Failed to find RangeCache implementation: " + className, e); + } catch (NoSuchMethodException e) { + throw new IOException("No suitable constructor found for RangeCache implementation: " + className, e); + } + } +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/PageCache.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageCache.java new file mode 100644 index 00000000..f6a21a11 --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageCache.java @@ -0,0 +1,40 @@ +package org.apache.hadoop.fs.cosn.cache; + +import org.apache.hadoop.conf.Configuration; + +import java.io.IOException; + +public interface PageCache { + class Page { + private final String filePath; + private final long offsetInFile; + private final byte[] content; + + public Page(String filePath, long offsetInFile, byte[] content) { + this.filePath = filePath; + this.offsetInFile = offsetInFile; + this.content = content; + } + + public String getFilePath() { + return filePath; + } + + public long getOffsetInFile() { + return offsetInFile; + } + + public byte[] getContent() { + return content; + } + } + + void initialize(Configuration configuration) throws IOException; + void put(Page page) throws IOException; + Page get(String filePath, long startOffsetInFile) throws IOException; + boolean contains(String filePath, long startOffsetInFile) throws IOException; + void remove(String filePath, long startOffsetInFile) throws IOException; + void remove(String filePath) throws IOException; + void clear() throws IOException; + void close(); +} diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/PageFaultException.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageFaultException.java new file mode 100644 index 00000000..39deb40a --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageFaultException.java @@ -0,0 +1,19 @@ +package org.apache.hadoop.fs.cosn.cache; + +import java.io.IOException; + +public class PageFaultException extends IOException { + private static final long serialVersionUID = 1L; + + public PageFaultException(String message) { + super(message); + } + + public PageFaultException(String message, Throwable cause) { + super(message, cause); + } + + public PageFaultException(Throwable cause) { + super(cause); + } +} diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/PageOverlapException.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageOverlapException.java new file mode 100644 index 00000000..79b5cb91 --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/PageOverlapException.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.fs.cosn.cache; + +public class PageOverlapException extends PageFaultException { + public PageOverlapException(String message) { + super(message); + } + + public PageOverlapException(String message, Throwable cause) { + super(message, cause); + } + + public PageOverlapException(Throwable cause) { + super(cause); + } +} diff --git a/src/main/java/org/apache/hadoop/fs/cosn/cache/RangeCache.java b/src/main/java/org/apache/hadoop/fs/cosn/cache/RangeCache.java new file mode 100644 index 00000000..d507a38a --- /dev/null +++ b/src/main/java/org/apache/hadoop/fs/cosn/cache/RangeCache.java @@ -0,0 +1,131 @@ +package org.apache.hadoop.fs.cosn.cache; + +import java.io.Serializable; +import java.util.Map; +import java.util.Objects; + +public abstract class RangeCache, V> { + public static final class Entry implements Serializable { + private static final long serialVersionUID = 5916455377642165613L; + + public enum RangeType { + CLOSED_OPEN, OPEN_CLOSED, OPEN_OPEN, CLOSED_CLOSED + } + + public static > Entry closed(K lowerEndpoint, K upperEndpoint) { + return new Entry<>(lowerEndpoint, upperEndpoint, RangeType.CLOSED_CLOSED); + } + + public static > Entry closedOpen(K lowerEndpoint, K upperEndpoint) { + return new Entry<>(lowerEndpoint, upperEndpoint, RangeType.CLOSED_OPEN); + } + + public static > Entry openClosed(K lowerEndpoint, K upperEndpoint) { + return new Entry<>(lowerEndpoint, upperEndpoint, RangeType.OPEN_CLOSED); + } + + public static > Entry open(K lowerEndpoint, K upperEndpoint) { + return new Entry<>(lowerEndpoint, upperEndpoint, RangeType.OPEN_OPEN); + } + + private final K lowerEndpoint; + private final K upperEndpoint; + private final RangeType rangeType; + + private Entry(K lowerEndpoint, K upperEndpoint, RangeType rangeType) { + this.lowerEndpoint = lowerEndpoint; + this.upperEndpoint = upperEndpoint; + this.rangeType = rangeType; + } + + public K getLowerEndpoint() { + return lowerEndpoint; + } + + public K getUpperEndpoint() { + return upperEndpoint; + } + + public RangeType getRangeType() { + return rangeType; + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Entry)) return false; + Entry entry = (Entry) o; + return Objects.equals(lowerEndpoint, entry.lowerEndpoint) && Objects.equals(upperEndpoint, entry.upperEndpoint) && rangeType == entry.rangeType; + } + + @Override + public int hashCode() { + return Objects.hash(lowerEndpoint, upperEndpoint, rangeType); + } + } + + protected final int capacity; + protected RemovalListener, V> removalListener = null; + + public static class RemovalNotification implements Map.Entry, Serializable { + private static final long serialVersionUID = 2457865473217506034L; + + private final RANGE_KEY rangeKey; + private final V value; + + RemovalNotification(RANGE_KEY rangeKey, V value) { + this.rangeKey = rangeKey; + this.value = value; + } + + @Override + public RANGE_KEY getKey() { + return this.rangeKey; + } + + @Override + public V getValue() { + return this.value; + } + + @Override + public V setValue(V value) { + throw new UnsupportedOperationException(); + } + } + + public interface RemovalListener { + void onRemoval(RemovalNotification notification); + } + + protected RangeCache(int capacity) { + this.capacity = capacity; + } + + public void setRemovalListener(RemovalListener, V> removalListener) { + this.removalListener = removalListener; + } + + public abstract void put(Entry entry, V value); + + public abstract V get(K key); + + public abstract Entry getEntry(K key); + + public abstract boolean contains(K key); + + public abstract void remove(K entry ); + + public abstract int size(); + + public abstract boolean isEmpty(); + + public abstract void clear(); + + public abstract boolean containsOverlaps(Entry entry); + + protected void notifyRemoval(RemovalNotification, V> notification) { + if (removalListener != null) { + removalListener.onRemoval(notification); + } + } +}