From c18acfc385adfb7095286b093c63772a6a3d47e3 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 17 Apr 2025 16:43:28 +0800 Subject: [PATCH 01/10] Pipe/Load: Implement multi-disk awareness of multiple file systems during file copying and moving --- .../thrift/IoTDBDataNodeReceiver.java | 34 +---- .../plan/analyze/load/LoadTsFileAnalyzer.java | 73 ++-------- .../storageengine/dataregion/DataRegion.java | 55 +++---- .../load/active/ActiveLoadUtil.java | 134 ++++++++++++++++++ .../load/disk/ILoadDiskSelector.java | 28 +++- ...heritSystemMultiDisksStrategySelector.java | 34 ++--- .../load/disk/MinIOSelector.java | 78 ++++++---- 7 files changed, 253 insertions(+), 183 deletions(-) create mode 100644 iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 513c7b0790f5..4c897f455215 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -37,8 +37,6 @@ import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver; import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler; import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; -import org.apache.iotdb.commons.utils.FileUtils; -import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.conf.IoTDBConfig; @@ -95,6 +93,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement; import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement; +import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil; import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; import org.apache.iotdb.db.tools.schema.SRStatementGenerator; @@ -546,33 +545,10 @@ protected TSStatus loadFileV2( private TSStatus loadTsFileAsync(final String dataBaseName, final List absolutePaths) throws IOException { - final String loadActiveListeningPipeDir = IOTDB_CONFIG.getLoadActiveListeningPipeDir(); - if (Objects.isNull(loadActiveListeningPipeDir)) { - throw new PipeException("Load active listening pipe dir is not set."); - } - - if (Objects.nonNull(dataBaseName)) { - final File targetDir = new File(loadActiveListeningPipeDir, dataBaseName); - return this.loadTsFileAsyncToTargetDir(targetDir, absolutePaths); - } - - return loadTsFileAsyncToTargetDir(new File(loadActiveListeningPipeDir), absolutePaths); - } - - private TSStatus loadTsFileAsyncToTargetDir( - final File targetDir, final List absolutePaths) throws IOException { - for (final String absolutePath : absolutePaths) { - if (absolutePath == null) { - continue; - } - final File sourceFile = new File(absolutePath); - if (!Objects.equals( - targetDir.getAbsolutePath(), sourceFile.getParentFile().getAbsolutePath())) { - RetryUtils.retryOnException( - () -> { - FileUtils.moveFileWithMD5Check(sourceFile, targetDir); - return null; - }); + for (String fileAbsolutePath : absolutePaths) { + if (!ActiveLoadUtil.loadTsFilesAsyncToActiveDir( + dataBaseName, new File(fileAbsolutePath), true)) { + throw new PipeException("Load active listening pipe dir is not set."); } } return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index 498e678b4bca..8a04e04e4687 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -22,7 +22,6 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.auth.AuthException; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.utils.RetryUtils; import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.load.LoadAnalyzeException; @@ -50,6 +49,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils; +import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil; import org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter; import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet; import org.apache.iotdb.db.utils.TimestampPrecisionUtils; @@ -81,8 +81,6 @@ import java.util.Objects; import java.util.Optional; -import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check; -import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check; import static org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED; import static org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.validateDatabaseName; import static org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS; @@ -279,69 +277,14 @@ private boolean checkBeforeAnalyzeFileByFile(IAnalysis analysis) { } private boolean doAsyncLoad(final IAnalysis analysis) { - final String[] loadActiveListeningDirs = - IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs(); - String targetFilePath = null; - for (int i = 0, size = loadActiveListeningDirs == null ? 0 : loadActiveListeningDirs.length; - i < size; - i++) { - if (loadActiveListeningDirs[i] != null) { - targetFilePath = loadActiveListeningDirs[i]; - break; - } - } - if (targetFilePath == null) { - LOGGER.warn("Load active listening dir is not set. Will try sync load instead."); - return false; - } - - try { - if (Objects.nonNull(databaseForTableData)) { - loadTsFilesAsyncToTargetDir(new File(targetFilePath, databaseForTableData), tsFiles); - } else { - loadTsFilesAsyncToTargetDir(new File(targetFilePath), tsFiles); - } - } catch (Exception e) { - LOGGER.warn( - "Failed to async load tsfiles {} to target dir {}. Will try sync load instead.", - tsFiles, - targetFilePath, - e); - return false; - } - - analysis.setFinishQueryAfterAnalyze(true); - setRealStatement(analysis); - return true; - } - - private void loadTsFilesAsyncToTargetDir(final File targetDir, final List files) - throws IOException { - for (final File file : files) { - if (file == null) { - continue; - } - - loadTsFileAsyncToTargetDir(targetDir, file); - loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() + ".resource")); - loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() + ".mods")); - } - } - - private void loadTsFileAsyncToTargetDir(final File targetDir, final File file) - throws IOException { - if (!file.exists()) { - return; + if (ActiveLoadUtil.loadTsFileAsyncToActiveDir( + tsFiles, databaseForTableData, isDeleteAfterLoad)) { + analysis.setFinishQueryAfterAnalyze(true); + setRealStatement(analysis); + return true; } - RetryUtils.retryOnException( - () -> { - if (isDeleteAfterLoad) { - moveFileWithMD5Check(file, targetDir); - } else { - copyFileWithMD5Check(file, targetDir); - } - return null; - }); + LOGGER.info("Async Load has failed, and is now trying to load sync"); + return false; } private boolean doAnalyzeFileByFile(IAnalysis analysis) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 6fb11196d713..4e1d6ec30756 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -127,8 +127,6 @@ import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener; import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener; import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector; -import org.apache.iotdb.db.storageengine.load.disk.InheritSystemMultiDisksStrategySelector; -import org.apache.iotdb.db.storageengine.load.disk.MinIOSelector; import org.apache.iotdb.db.storageengine.load.limiter.LoadTsFileRateLimiter; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; @@ -416,27 +414,12 @@ public DataRegion(String databaseName, String id) { } private void initDiskSelector() { - switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue(config.getLoadDiskSelectStrategy())) { - case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY: - ordinaryLoadDiskSelector = new InheritSystemMultiDisksStrategySelector(); - break; - case MIN_IO_FIRST: - default: - ordinaryLoadDiskSelector = new MinIOSelector(); - } - - switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue( - config.getLoadDiskSelectStrategyForIoTV2AndPipe())) { - case MIN_IO_FIRST: - pipeAndIoTV2LoadDiskSelector = new MinIOSelector(); - break; - case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY: - pipeAndIoTV2LoadDiskSelector = new InheritSystemMultiDisksStrategySelector(); - break; - case INHERIT_LOAD: - default: - pipeAndIoTV2LoadDiskSelector = ordinaryLoadDiskSelector; - } + ordinaryLoadDiskSelector = + ILoadDiskSelector.initDiskSelector( + config.getLoadDiskSelectStrategy(), config.getTierDataDirs()[0]); + pipeAndIoTV2LoadDiskSelector = + ILoadDiskSelector.initDiskSelector( + config.getLoadDiskSelectStrategyForIoTV2AndPipe(), config.getTierDataDirs()[0]); } @Override @@ -3099,22 +3082,20 @@ private boolean loadTsFileToUnSequence( boolean isGeneratedByPipe) throws LoadFileException, DiskSpaceInsufficientException { final int targetTierLevel = 0; + final String fileName = + databaseName + + File.separatorChar + + dataRegionId + + File.separatorChar + + filePartitionId + + File.separator + + tsFileResource.getTsFile().getName(); final File targetFile = (tsFileResource.isGeneratedByPipeConsensus() || tsFileResource.isGeneratedByPipe()) - ? pipeAndIoTV2LoadDiskSelector.getTargetFile( - tsFileToLoad, - databaseName, - dataRegionId, - filePartitionId, - tsFileResource.getTsFile().getName(), - targetTierLevel) - : ordinaryLoadDiskSelector.getTargetFile( - tsFileToLoad, - databaseName, - dataRegionId, - filePartitionId, - tsFileResource.getTsFile().getName(), - targetTierLevel); + ? pipeAndIoTV2LoadDiskSelector.getTargetDir( + tsFileToLoad, TierManager.getInstance(), fileName, targetTierLevel) + : ordinaryLoadDiskSelector.getTargetDir( + tsFileToLoad, TierManager.getInstance(), fileName, targetTierLevel); tsFileResource.setFile(targetFile); if (tsFileManager.contains(tsFileResource, false)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java new file mode 100644 index 000000000000..f29d1c28f532 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.load.active; + +import org.apache.iotdb.commons.utils.RetryUtils; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector; +import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; +import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check; +import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check; + +public class ActiveLoadUtil { + + private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadUtil.class); + + private static volatile ILoadDiskSelector loadDiskSelector = + ILoadDiskSelector.initDiskSelector( + IoTDBDescriptor.getInstance().getConfig().getLoadDiskSelectStrategy(), + IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()); + + private static volatile FolderManager folderManager; + + static { + try { + folderManager = + new FolderManager( + Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()), + DirectoryStrategyType.SEQUENCE_STRATEGY); + } catch (final DiskSpaceInsufficientException e) { + LOGGER.error( + "Fail to create pipe receiver file folders allocation strategy because all disks of folders are full.", + e); + } + } + + public static boolean loadTsFileAsyncToActiveDir( + final List tsFiles, final String dataBaseName, final boolean isDeleteAfterLoad) { + if (tsFiles == null || tsFiles.isEmpty()) { + return true; + } + + try { + for (File file : tsFiles) { + if (!loadTsFilesAsyncToActiveDir(dataBaseName, file, isDeleteAfterLoad)) { + return false; + } + } + } catch (Exception e) { + LOGGER.warn("Fail to load tsfile to Active dir", e); + return false; + } + + return true; + } + + public static boolean loadTsFilesAsyncToActiveDir( + final String dataBaseName, final File file, final boolean isDeleteAfterLoad) + throws IOException { + if (file == null) { + return true; + } + + final String targetFilePath; + try { + targetFilePath = loadDiskSelector.getTargetDir(file, folderManager); + } catch (DiskSpaceInsufficientException e) { + LOGGER.warn("Fail to load disk space of file {}", file.getAbsolutePath(), e); + return false; + } + + if (targetFilePath == null) { + LOGGER.warn("Load active listening dir is not set."); + return false; + } + final File targetDir; + if (Objects.nonNull(dataBaseName)) { + targetDir = new File(targetFilePath, dataBaseName); + } else { + targetDir = new File(targetFilePath); + } + + loadTsFileAsyncToTargetDir(targetDir, file, isDeleteAfterLoad); + loadTsFileAsyncToTargetDir( + targetDir, new File(file.getAbsolutePath() + ".resource"), isDeleteAfterLoad); + loadTsFileAsyncToTargetDir( + targetDir, new File(file.getAbsolutePath() + ".mods"), isDeleteAfterLoad); + return true; + } + + private static void loadTsFileAsyncToTargetDir( + final File targetDir, final File file, final boolean isDeleteAfterLoad) throws IOException { + if (!file.exists()) { + return; + } + RetryUtils.retryOnException( + () -> { + if (isDeleteAfterLoad) { + moveFileWithMD5Check(file, targetDir); + } else { + copyFileWithMD5Check(file, targetDir); + } + return null; + }); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java index b14f0c4b5df4..a91373e29ff5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java @@ -20,20 +20,34 @@ package org.apache.iotdb.db.storageengine.load.disk; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; +import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import java.io.File; public interface ILoadDiskSelector { - File getTargetFile( - File fileToLoad, - String databaseName, - String dataRegionId, - long filePartitionId, - String tsfileName, - int tierLevel) + File getTargetDir(File fileToLoad, TierManager tierManager, String tsfileName, int tierLevel) throws DiskSpaceInsufficientException; + public String getTargetDir(File fileToLoad, FolderManager folderManager) + throws DiskSpaceInsufficientException; + + public LoadDiskSelectorType getLoadDiskSelectorType(); + + public static ILoadDiskSelector initDiskSelector(String selectStrategy, String[] dirs) { + ILoadDiskSelector selector; + switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue(selectStrategy)) { + case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY: + selector = new InheritSystemMultiDisksStrategySelector(); + break; + case MIN_IO_FIRST: + default: + selector = new MinIOSelector(dirs); + } + return selector; + } + enum LoadDiskSelectorType { MIN_IO_FIRST("MIN_IO_FIRST"), INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY("INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY"), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java index 16e420c5e2f7..ef149fcc18a5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java @@ -20,6 +20,7 @@ package org.apache.iotdb.db.storageengine.load.disk; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.tsfile.fileSystem.FSFactoryProducer; @@ -29,30 +30,25 @@ public class InheritSystemMultiDisksStrategySelector implements ILoadDiskSelector { - protected final FSFactory fsFactory = FSFactoryProducer.getFSFactory(); + protected static final FSFactory fsFactory = FSFactoryProducer.getFSFactory(); - public InheritSystemMultiDisksStrategySelector() { - // empty body + public InheritSystemMultiDisksStrategySelector() {} + + @Override + public LoadDiskSelectorType getLoadDiskSelectorType() { + return LoadDiskSelectorType.INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY; } @Override - public File getTargetFile( - File fileToLoad, - String databaseName, - String dataRegionId, - long filePartitionId, - String tsfileName, - int tierLevel) + public File getTargetDir( + File fileToLoad, TierManager tierManager, String tsfileName, int tierLevel) throws DiskSpaceInsufficientException { // inherit system multi-disks select strategy, see configuration `dn_multi_dir_strategy` - return fsFactory.getFile( - TierManager.getInstance().getNextFolderForTsFile(tierLevel, false), - databaseName - + File.separatorChar - + dataRegionId - + File.separatorChar - + filePartitionId - + File.separator - + tsfileName); + return fsFactory.getFile(tierManager.getNextFolderForTsFile(tierLevel, false), tsfileName); + } + + public String getTargetDir(File fileToLoad, FolderManager folderManager) + throws DiskSpaceInsufficientException { + return folderManager.getNextFolder(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java index f6fac586700d..f3244defa690 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java @@ -20,9 +20,9 @@ package org.apache.iotdb.db.storageengine.load.disk; import org.apache.iotdb.commons.conf.IoTDBConstant; -import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; +import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; +import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.iotdb.metrics.utils.FileStoreUtils; import org.slf4j.Logger; @@ -31,6 +31,7 @@ import java.io.File; import java.nio.file.FileStore; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Objects; @@ -39,13 +40,18 @@ public class MinIOSelector extends InheritSystemMultiDisksStrategySelector { private static final Logger logger = LoggerFactory.getLogger(MinIOSelector.class); - private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); + private final Map rootDisks2DataDirsMapForLoad; - public MinIOSelector() { + public MinIOSelector(String[] dirs) { + if (dirs == null || dirs.length == 0) { + rootDisks2DataDirsMapForLoad = Collections.emptyMap(); + logger.warn("MinIO selector requires at least one directory"); + return; + } // init data dirs' root disks - this.rootDisks2DataDirsMapForLoad = new HashMap<>(config.getTierDataDirs()[0].length); - Arrays.stream(config.getTierDataDirs()[0]) + this.rootDisks2DataDirsMapForLoad = new HashMap<>(dirs.length); + Arrays.stream(dirs) .filter(Objects::nonNull) .map(v -> fsFactory.getFile(v, IoTDBConstant.UNSEQUENCE_FOLDER_NAME).getPath()) .forEach( @@ -69,13 +75,8 @@ public MinIOSelector() { } @Override - public File getTargetFile( - File fileToLoad, - String databaseName, - String dataRegionId, - long filePartitionId, - String tsfileName, - int tierLevel) + public File getTargetDir( + File fileToLoad, TierManager tierManager, String tsfileName, int tierLevel) throws DiskSpaceInsufficientException { File targetFile; String fileDirRoot = null; @@ -85,28 +86,53 @@ public File getTargetFile( .map(Object::toString) .orElse(null); } catch (Exception e) { - logger.warn("Exception occurs when reading target file's mount point {}", filePartitionId, e); + logger.warn( + "Exception occurs when reading target file's mount point {}", + fileToLoad.getAbsoluteFile(), + e); } if (rootDisks2DataDirsMapForLoad.containsKey(fileDirRoot)) { // if there is an overlap between firDirRoot and data directories' disk roots, try to get // targetFile in the same disk - targetFile = - fsFactory.getFile( - rootDisks2DataDirsMapForLoad.get(fileDirRoot), - databaseName - + File.separatorChar - + dataRegionId - + File.separatorChar - + filePartitionId - + File.separator - + tsfileName); + targetFile = fsFactory.getFile(rootDisks2DataDirsMapForLoad.get(fileDirRoot), tsfileName); return targetFile; } // if there isn't an overlap, downgrade to storage balance(sequence) strategy. - return super.getTargetFile( - fileToLoad, databaseName, dataRegionId, filePartitionId, tsfileName, tierLevel); + return super.getTargetDir(fileToLoad, tierManager, tsfileName, tierLevel); + } + + @Override + public String getTargetDir(File fileToLoad, FolderManager folderManager) + throws DiskSpaceInsufficientException { + File targetFile; + String fileDirRoot = null; + try { + fileDirRoot = + Optional.ofNullable(FileStoreUtils.getFileStore(fileToLoad.getCanonicalPath())) + .map(Object::toString) + .orElse(null); + } catch (Exception e) { + logger.warn( + "Exception occurs when reading target file's mount point {}", + fileToLoad.getAbsoluteFile(), + e); + } + + if (rootDisks2DataDirsMapForLoad.containsKey(fileDirRoot)) { + // if there is an overlap between firDirRoot and data directories' disk roots, try to get + // targetFile in the same disk + return rootDisks2DataDirsMapForLoad.get(fileDirRoot); + } + + // if there isn't an overlap, downgrade to storage balance(sequence) strategy. + return super.getTargetDir(fileToLoad, folderManager); + } + + @Override + public LoadDiskSelectorType getLoadDiskSelectorType() { + return LoadDiskSelectorType.MIN_IO_FIRST; } } From ef1210f63499cbc6bc5ee6f54a40b2176fbd08f1 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 17 Apr 2025 17:26:23 +0800 Subject: [PATCH 02/10] spotless --- .../db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java index e0fbb6faa1b6..671b8dc0e626 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java @@ -286,8 +286,8 @@ private boolean doAsyncLoad(final IAnalysis analysis) { setRealStatement(analysis); return true; } - LOGGER.info("Async Load has failed, and is now trying to load sync"); - return false; + LOGGER.info("Async Load has failed, and is now trying to load sync"); + return false; } finally { LoadTsFileCostMetricsSet.getInstance() .recordPhaseTimeCost(ANALYSIS_ASYNC_MOVE, System.nanoTime() - startTime); From 7478253379d03eb29cf10ce627cc895bd2f6c05e Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Tue, 22 Apr 2025 20:21:49 +0800 Subject: [PATCH 03/10] fix --- .../storageengine/dataregion/DataRegion.java | 28 ++++++--- .../load/active/ActiveLoadDirScanner.java | 1 + .../load/active/ActiveLoadUtil.java | 62 ++++++++++++------- .../load/disk/ILoadDiskSelector.java | 23 +++---- ...heritSystemMultiDisksStrategySelector.java | 24 +++---- .../load/disk/MinIOSelector.java | 57 +++++------------ 6 files changed, 99 insertions(+), 96 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 8aa2354c03ba..e83970fbb86c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -328,8 +328,8 @@ public class DataRegion implements IDataRegionForQuery { private final DataRegionMetrics metrics; - private ILoadDiskSelector ordinaryLoadDiskSelector; - private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector; + private ILoadDiskSelector ordinaryLoadDiskSelector; + private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector; /** * Construct a database processor. @@ -414,12 +414,24 @@ public DataRegion(String databaseName, String id) { } private void initDiskSelector() { + final ILoadDiskSelector.DiskDirectorySelector selector = + new ILoadDiskSelector.DiskDirectorySelector() { + @Override + public File selectDirectory(File file, Integer level) + throws DiskSpaceInsufficientException { + return fsFactory.getFile( + TierManager.getInstance().getNextFolderForTsFile(level, false), file.getName()); + } + }; + ordinaryLoadDiskSelector = ILoadDiskSelector.initDiskSelector( - config.getLoadDiskSelectStrategy(), config.getTierDataDirs()[0]); + config.getLoadDiskSelectStrategy(), config.getTierDataDirs()[0], selector); pipeAndIoTV2LoadDiskSelector = ILoadDiskSelector.initDiskSelector( - config.getLoadDiskSelectStrategyForIoTV2AndPipe(), config.getTierDataDirs()[0]); + config.getLoadDiskSelectStrategyForIoTV2AndPipe(), + config.getTierDataDirs()[0], + selector); } @Override @@ -3095,10 +3107,10 @@ private boolean loadTsFileToUnSequence( + tsFileResource.getTsFile().getName(); final File targetFile = (tsFileResource.isGeneratedByPipeConsensus() || tsFileResource.isGeneratedByPipe()) - ? pipeAndIoTV2LoadDiskSelector.getTargetDir( - tsFileToLoad, TierManager.getInstance(), fileName, targetTierLevel) - : ordinaryLoadDiskSelector.getTargetDir( - tsFileToLoad, TierManager.getInstance(), fileName, targetTierLevel); + ? pipeAndIoTV2LoadDiskSelector.diskDirectorySelector( + new File(tsFileToLoad, fileName), true, targetTierLevel) + : ordinaryLoadDiskSelector.diskDirectorySelector( + new File(tsFileToLoad, fileName), true, targetTierLevel); tsFileResource.setFile(targetFile); if (tsFileManager.contains(tsFileResource, false)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java index 34bb392dc284..3a16d4ca0255 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java @@ -178,6 +178,7 @@ private void hotReloadActiveLoadDirs() { listeningDirsConfig.set(IOTDB_CONFIG.getLoadActiveListeningDirs()); listeningDirs.addAll(Arrays.asList(IOTDB_CONFIG.getLoadActiveListeningDirs())); + ActiveLoadUtil.updateLoadDisLoad(); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java index f29d1c28f532..6455076e60ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java @@ -42,25 +42,7 @@ public class ActiveLoadUtil { private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadUtil.class); - private static volatile ILoadDiskSelector loadDiskSelector = - ILoadDiskSelector.initDiskSelector( - IoTDBDescriptor.getInstance().getConfig().getLoadDiskSelectStrategy(), - IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()); - - private static volatile FolderManager folderManager; - - static { - try { - folderManager = - new FolderManager( - Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()), - DirectoryStrategyType.SEQUENCE_STRATEGY); - } catch (final DiskSpaceInsufficientException e) { - LOGGER.error( - "Fail to create pipe receiver file folders allocation strategy because all disks of folders are full.", - e); - } - } + private static volatile ILoadDiskSelector loadDiskSelector = updateLoadDisLoad(); public static boolean loadTsFileAsyncToActiveDir( final List tsFiles, final String dataBaseName, final boolean isDeleteAfterLoad) { @@ -89,9 +71,9 @@ public static boolean loadTsFilesAsyncToActiveDir( return true; } - final String targetFilePath; + final File targetFilePath; try { - targetFilePath = loadDiskSelector.getTargetDir(file, folderManager); + targetFilePath = loadDiskSelector.diskDirectorySelector(file, false, null); } catch (DiskSpaceInsufficientException e) { LOGGER.warn("Fail to load disk space of file {}", file.getAbsolutePath(), e); return false; @@ -105,7 +87,7 @@ public static boolean loadTsFilesAsyncToActiveDir( if (Objects.nonNull(dataBaseName)) { targetDir = new File(targetFilePath, dataBaseName); } else { - targetDir = new File(targetFilePath); + targetDir = targetFilePath; } loadTsFileAsyncToTargetDir(targetDir, file, isDeleteAfterLoad); @@ -131,4 +113,40 @@ private static void loadTsFileAsyncToTargetDir( return null; }); } + + public static ILoadDiskSelector updateLoadDisLoad() { + ILoadDiskSelector loadDiskSelector = + ILoadDiskSelector.initDiskSelector( + IoTDBDescriptor.getInstance().getConfig().getLoadDiskSelectStrategy(), + IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs(), + new ILoadDiskSelector.DiskDirectorySelector() { + private FolderManager folderManager; + + @Override + public File selectDirectory(File file, Void unused) + throws DiskSpaceInsufficientException { + initFolderManager(); + return new File(folderManager.getNextFolder()); + } + + private void initFolderManager() throws DiskSpaceInsufficientException { + if (folderManager == null) { + synchronized (this) { + if (folderManager != null) { + return; + } + final String[] loadActiveListeningDirs = + IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs(); + folderManager = + new FolderManager( + Arrays.asList(loadActiveListeningDirs), + DirectoryStrategyType.SEQUENCE_STRATEGY); + } + } + } + }); + + ActiveLoadUtil.loadDiskSelector = loadDiskSelector; + return loadDiskSelector; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java index a91373e29ff5..72e0d94ce097 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java @@ -20,32 +20,33 @@ package org.apache.iotdb.db.storageengine.load.disk; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import java.io.File; -public interface ILoadDiskSelector { +public interface ILoadDiskSelector { - File getTargetDir(File fileToLoad, TierManager tierManager, String tsfileName, int tierLevel) - throws DiskSpaceInsufficientException; + @FunctionalInterface + public interface DiskDirectorySelector { + File selectDirectory(File file, U u) throws DiskSpaceInsufficientException; + } - public String getTargetDir(File fileToLoad, FolderManager folderManager) + public File diskDirectorySelector(File file, boolean isNeedCreateTargetFile, U u) throws DiskSpaceInsufficientException; public LoadDiskSelectorType getLoadDiskSelectorType(); - public static ILoadDiskSelector initDiskSelector(String selectStrategy, String[] dirs) { - ILoadDiskSelector selector; + public static ILoadDiskSelector initDiskSelector( + String selectStrategy, String[] dirs, DiskDirectorySelector selector) { + ILoadDiskSelector diskSelector; switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue(selectStrategy)) { case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY: - selector = new InheritSystemMultiDisksStrategySelector(); + diskSelector = new InheritSystemMultiDisksStrategySelector(selector); break; case MIN_IO_FIRST: default: - selector = new MinIOSelector(dirs); + diskSelector = new MinIOSelector(dirs, selector); } - return selector; + return diskSelector; } enum LoadDiskSelectorType { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java index ef149fcc18a5..5a1f8b99721d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java @@ -20,35 +20,29 @@ package org.apache.iotdb.db.storageengine.load.disk; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.fileSystem.fsFactory.FSFactory; import java.io.File; -public class InheritSystemMultiDisksStrategySelector implements ILoadDiskSelector { +public class InheritSystemMultiDisksStrategySelector implements ILoadDiskSelector { protected static final FSFactory fsFactory = FSFactoryProducer.getFSFactory(); - public InheritSystemMultiDisksStrategySelector() {} + DiskDirectorySelector directorySelector; - @Override - public LoadDiskSelectorType getLoadDiskSelectorType() { - return LoadDiskSelectorType.INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY; + public InheritSystemMultiDisksStrategySelector(DiskDirectorySelector selector) { + this.directorySelector = selector; } - @Override - public File getTargetDir( - File fileToLoad, TierManager tierManager, String tsfileName, int tierLevel) + public File diskDirectorySelector(File file, boolean createTargetFile, U u) throws DiskSpaceInsufficientException { - // inherit system multi-disks select strategy, see configuration `dn_multi_dir_strategy` - return fsFactory.getFile(tierManager.getNextFolderForTsFile(tierLevel, false), tsfileName); + return directorySelector.selectDirectory(file, u); } - public String getTargetDir(File fileToLoad, FolderManager folderManager) - throws DiskSpaceInsufficientException { - return folderManager.getNextFolder(); + @Override + public LoadDiskSelectorType getLoadDiskSelectorType() { + return LoadDiskSelectorType.INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java index f3244defa690..7dab5bb59550 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java @@ -21,8 +21,6 @@ import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; -import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager; -import org.apache.iotdb.db.storageengine.rescon.disk.TierManager; import org.apache.iotdb.metrics.utils.FileStoreUtils; import org.slf4j.Logger; @@ -37,13 +35,14 @@ import java.util.Objects; import java.util.Optional; -public class MinIOSelector extends InheritSystemMultiDisksStrategySelector { +public class MinIOSelector extends InheritSystemMultiDisksStrategySelector { private static final Logger logger = LoggerFactory.getLogger(MinIOSelector.class); private final Map rootDisks2DataDirsMapForLoad; - public MinIOSelector(String[] dirs) { + public MinIOSelector(String[] dirs, DiskDirectorySelector selector) { + super(selector); if (dirs == null || dirs.length == 0) { rootDisks2DataDirsMapForLoad = Collections.emptyMap(); logger.warn("MinIO selector requires at least one directory"); @@ -75,60 +74,38 @@ public MinIOSelector(String[] dirs) { } @Override - public File getTargetDir( - File fileToLoad, TierManager tierManager, String tsfileName, int tierLevel) + public File diskDirectorySelector(File file, boolean createTargetFile, U u) throws DiskSpaceInsufficientException { - File targetFile; + final File targetDir = file.getParentFile(); + final String fileName = file.getName(); String fileDirRoot = null; try { fileDirRoot = - Optional.ofNullable(FileStoreUtils.getFileStore(fileToLoad.getCanonicalPath())) + Optional.ofNullable(FileStoreUtils.getFileStore(targetDir.getCanonicalPath())) .map(Object::toString) .orElse(null); } catch (Exception e) { logger.warn( "Exception occurs when reading target file's mount point {}", - fileToLoad.getAbsoluteFile(), + targetDir.getAbsoluteFile(), e); } + File targetFile = null; if (rootDisks2DataDirsMapForLoad.containsKey(fileDirRoot)) { - // if there is an overlap between firDirRoot and data directories' disk roots, try to get - // targetFile in the same disk - targetFile = fsFactory.getFile(rootDisks2DataDirsMapForLoad.get(fileDirRoot), tsfileName); + if (createTargetFile) { + // if there is an overlap between firDirRoot and data directories' disk roots, try to get + // targetFile in the same disk + targetFile = fsFactory.getFile(rootDisks2DataDirsMapForLoad.get(fileDirRoot), fileName); + } else { + targetFile = new File(rootDisks2DataDirsMapForLoad.get(fileDirRoot)); + } return targetFile; } // if there isn't an overlap, downgrade to storage balance(sequence) strategy. - return super.getTargetDir(fileToLoad, tierManager, tsfileName, tierLevel); - } - - @Override - public String getTargetDir(File fileToLoad, FolderManager folderManager) - throws DiskSpaceInsufficientException { - File targetFile; - String fileDirRoot = null; - try { - fileDirRoot = - Optional.ofNullable(FileStoreUtils.getFileStore(fileToLoad.getCanonicalPath())) - .map(Object::toString) - .orElse(null); - } catch (Exception e) { - logger.warn( - "Exception occurs when reading target file's mount point {}", - fileToLoad.getAbsoluteFile(), - e); - } - - if (rootDisks2DataDirsMapForLoad.containsKey(fileDirRoot)) { - // if there is an overlap between firDirRoot and data directories' disk roots, try to get - // targetFile in the same disk - return rootDisks2DataDirsMapForLoad.get(fileDirRoot); - } - - // if there isn't an overlap, downgrade to storage balance(sequence) strategy. - return super.getTargetDir(fileToLoad, folderManager); + return super.diskDirectorySelector(file, createTargetFile, u); } @Override From c90a152bac3f8064f6009f7305a022bf16b94788 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 24 Apr 2025 11:40:49 +0800 Subject: [PATCH 04/10] fix --- .../thrift/IoTDBDataNodeReceiver.java | 8 ++--- .../load/active/ActiveLoadUtil.java | 36 +++++++++++++++++-- 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java index 4c897f455215..f2608c73f034 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java @@ -109,7 +109,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.nio.file.Paths; @@ -545,11 +544,8 @@ protected TSStatus loadFileV2( private TSStatus loadTsFileAsync(final String dataBaseName, final List absolutePaths) throws IOException { - for (String fileAbsolutePath : absolutePaths) { - if (!ActiveLoadUtil.loadTsFilesAsyncToActiveDir( - dataBaseName, new File(fileAbsolutePath), true)) { - throw new PipeException("Load active listening pipe dir is not set."); - } + if (!ActiveLoadUtil.loadFilesToActiveDir(dataBaseName, absolutePaths, true)) { + throw new PipeException("Load active listening pipe dir is not set."); } return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java index 6455076e60ad..1aab260de8d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java @@ -52,7 +52,7 @@ public static boolean loadTsFileAsyncToActiveDir( try { for (File file : tsFiles) { - if (!loadTsFilesAsyncToActiveDir(dataBaseName, file, isDeleteAfterLoad)) { + if (!loadTsFilesToActiveDir(dataBaseName, file, isDeleteAfterLoad)) { return false; } } @@ -64,7 +64,7 @@ public static boolean loadTsFileAsyncToActiveDir( return true; } - public static boolean loadTsFilesAsyncToActiveDir( + public static boolean loadTsFilesToActiveDir( final String dataBaseName, final File file, final boolean isDeleteAfterLoad) throws IOException { if (file == null) { @@ -98,6 +98,38 @@ public static boolean loadTsFilesAsyncToActiveDir( return true; } + public static boolean loadFilesToActiveDir( + final String dataBaseName, final List files, final boolean isDeleteAfterLoad) + throws IOException { + if (files == null || files.isEmpty()) { + return true; + } + + final File targetFilePath; + try { + targetFilePath = loadDiskSelector.diskDirectorySelector(new File(files.get(0)), false, null); + } catch (DiskSpaceInsufficientException e) { + LOGGER.warn("Fail to load disk space of file {}", files.get(0), e); + return false; + } + + if (targetFilePath == null) { + LOGGER.warn("Load active listening dir is not set."); + return false; + } + final File targetDir; + if (Objects.nonNull(dataBaseName)) { + targetDir = new File(targetFilePath, dataBaseName); + } else { + targetDir = targetFilePath; + } + + for (final String file : files) { + loadTsFileAsyncToTargetDir(targetDir, new File(file), isDeleteAfterLoad); + } + return true; + } + private static void loadTsFileAsyncToTargetDir( final File targetDir, final File file, final boolean isDeleteAfterLoad) throws IOException { if (!file.exists()) { From f9220be1e18fae08e516f4ef712a7753b3038cf9 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 24 Apr 2025 11:45:16 +0800 Subject: [PATCH 05/10] update ActiveLoadUtil --- .../iotdb/db/storageengine/load/active/ActiveLoadUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java index 1aab260de8d1..16c76ba0ce98 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java @@ -64,7 +64,7 @@ public static boolean loadTsFileAsyncToActiveDir( return true; } - public static boolean loadTsFilesToActiveDir( + private static boolean loadTsFilesToActiveDir( final String dataBaseName, final File file, final boolean isDeleteAfterLoad) throws IOException { if (file == null) { From ff131e1f03d7c8339ea44856abec57c3fbedf7ce Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 24 Apr 2025 11:47:33 +0800 Subject: [PATCH 06/10] update InheritSystemMultiDisksStrategySelector --- .../load/disk/InheritSystemMultiDisksStrategySelector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java index 5a1f8b99721d..d239fa2c5d5d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java @@ -30,7 +30,7 @@ public class InheritSystemMultiDisksStrategySelector implements ILoadDiskSele protected static final FSFactory fsFactory = FSFactoryProducer.getFSFactory(); - DiskDirectorySelector directorySelector; + protected final DiskDirectorySelector directorySelector; public InheritSystemMultiDisksStrategySelector(DiskDirectorySelector selector) { this.directorySelector = selector; From 1c4737ecb9190a260273804de8c429ca66b5720c Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Thu, 24 Apr 2025 12:31:24 +0800 Subject: [PATCH 07/10] update ILoadDisksSelector --- .../db/storageengine/dataregion/DataRegion.java | 9 +++++---- .../load/active/ActiveLoadUtil.java | 10 +++++++--- .../load/disk/ILoadDiskSelector.java | 17 +++++++++++------ ...InheritSystemMultiDisksStrategySelector.java | 7 ++++--- .../storageengine/load/disk/MinIOSelector.java | 13 ++++++------- 5 files changed, 33 insertions(+), 23 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index e83970fbb86c..9cf3b6a8d9ee 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -417,10 +417,11 @@ private void initDiskSelector() { final ILoadDiskSelector.DiskDirectorySelector selector = new ILoadDiskSelector.DiskDirectorySelector() { @Override - public File selectDirectory(File file, Integer level) + public File selectDirectory( + final File sourceDirectory, final String fileName, final Integer level) throws DiskSpaceInsufficientException { return fsFactory.getFile( - TierManager.getInstance().getNextFolderForTsFile(level, false), file.getName()); + TierManager.getInstance().getNextFolderForTsFile(level, false), fileName); } }; @@ -3108,9 +3109,9 @@ private boolean loadTsFileToUnSequence( final File targetFile = (tsFileResource.isGeneratedByPipeConsensus() || tsFileResource.isGeneratedByPipe()) ? pipeAndIoTV2LoadDiskSelector.diskDirectorySelector( - new File(tsFileToLoad, fileName), true, targetTierLevel) + tsFileToLoad.getParentFile(), fileName, true, targetTierLevel) : ordinaryLoadDiskSelector.diskDirectorySelector( - new File(tsFileToLoad, fileName), true, targetTierLevel); + tsFileToLoad.getParentFile(), fileName, true, targetTierLevel); tsFileResource.setFile(targetFile); if (tsFileManager.contains(tsFileResource, false)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java index 16c76ba0ce98..a4caf32f74d1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java @@ -73,7 +73,8 @@ private static boolean loadTsFilesToActiveDir( final File targetFilePath; try { - targetFilePath = loadDiskSelector.diskDirectorySelector(file, false, null); + targetFilePath = + loadDiskSelector.diskDirectorySelector(file.getParentFile(), file.getName(), false, null); } catch (DiskSpaceInsufficientException e) { LOGGER.warn("Fail to load disk space of file {}", file.getAbsolutePath(), e); return false; @@ -107,7 +108,9 @@ public static boolean loadFilesToActiveDir( final File targetFilePath; try { - targetFilePath = loadDiskSelector.diskDirectorySelector(new File(files.get(0)), false, null); + final File file = new File(files.get(0)); + targetFilePath = + loadDiskSelector.diskDirectorySelector(file.getParentFile(), file.getName(), false, null); } catch (DiskSpaceInsufficientException e) { LOGGER.warn("Fail to load disk space of file {}", files.get(0), e); return false; @@ -155,7 +158,8 @@ public static ILoadDiskSelector updateLoadDisLoad() { private FolderManager folderManager; @Override - public File selectDirectory(File file, Void unused) + public File selectDirectory( + final File sourceDir, final String fileName, final Void unused) throws DiskSpaceInsufficientException { initFolderManager(); return new File(folderManager.getNextFolder()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java index 72e0d94ce097..1be8b01cc131 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java @@ -27,17 +27,22 @@ public interface ILoadDiskSelector { @FunctionalInterface public interface DiskDirectorySelector { - File selectDirectory(File file, U u) throws DiskSpaceInsufficientException; + File selectDirectory(final File sourceDirectory, final String fileName, final U u) + throws DiskSpaceInsufficientException; } - public File diskDirectorySelector(File file, boolean isNeedCreateTargetFile, U u) + public File diskDirectorySelector( + final File sourceDirectory, + final String fileName, + final boolean isNeedCreateTargetFile, + final U u) throws DiskSpaceInsufficientException; public LoadDiskSelectorType getLoadDiskSelectorType(); public static ILoadDiskSelector initDiskSelector( - String selectStrategy, String[] dirs, DiskDirectorySelector selector) { - ILoadDiskSelector diskSelector; + final String selectStrategy, final String[] dirs, final DiskDirectorySelector selector) { + final ILoadDiskSelector diskSelector; switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue(selectStrategy)) { case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY: diskSelector = new InheritSystemMultiDisksStrategySelector(selector); @@ -58,7 +63,7 @@ enum LoadDiskSelectorType { private final String value; - LoadDiskSelectorType(String value) { + LoadDiskSelectorType(final String value) { this.value = value; } @@ -66,7 +71,7 @@ public String getValue() { return value; } - public static LoadDiskSelectorType fromValue(String value) { + public static LoadDiskSelectorType fromValue(final String value) { if (value.equalsIgnoreCase(MIN_IO_FIRST.getValue())) { return MIN_IO_FIRST; } else if (value.equalsIgnoreCase(INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY.getValue())) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java index d239fa2c5d5d..42b60d9d0618 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java @@ -32,13 +32,14 @@ public class InheritSystemMultiDisksStrategySelector implements ILoadDiskSele protected final DiskDirectorySelector directorySelector; - public InheritSystemMultiDisksStrategySelector(DiskDirectorySelector selector) { + public InheritSystemMultiDisksStrategySelector(final DiskDirectorySelector selector) { this.directorySelector = selector; } - public File diskDirectorySelector(File file, boolean createTargetFile, U u) + public File diskDirectorySelector( + final File sourceDirectory, final String FileName, final boolean createTargetFile, final U u) throws DiskSpaceInsufficientException { - return directorySelector.selectDirectory(file, u); + return directorySelector.selectDirectory(sourceDirectory, FileName, u); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java index 7dab5bb59550..0d810d17055e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java @@ -41,7 +41,7 @@ public class MinIOSelector extends InheritSystemMultiDisksStrategySelector private final Map rootDisks2DataDirsMapForLoad; - public MinIOSelector(String[] dirs, DiskDirectorySelector selector) { + public MinIOSelector(final String[] dirs, final DiskDirectorySelector selector) { super(selector); if (dirs == null || dirs.length == 0) { rootDisks2DataDirsMapForLoad = Collections.emptyMap(); @@ -74,20 +74,19 @@ public MinIOSelector(String[] dirs, DiskDirectorySelector selector) { } @Override - public File diskDirectorySelector(File file, boolean createTargetFile, U u) + public File diskDirectorySelector( + final File sourceDirectory, final String fileName, final boolean createTargetFile, final U u) throws DiskSpaceInsufficientException { - final File targetDir = file.getParentFile(); - final String fileName = file.getName(); String fileDirRoot = null; try { fileDirRoot = - Optional.ofNullable(FileStoreUtils.getFileStore(targetDir.getCanonicalPath())) + Optional.ofNullable(FileStoreUtils.getFileStore(sourceDirectory.getCanonicalPath())) .map(Object::toString) .orElse(null); } catch (Exception e) { logger.warn( "Exception occurs when reading target file's mount point {}", - targetDir.getAbsoluteFile(), + sourceDirectory.getAbsoluteFile(), e); } @@ -105,7 +104,7 @@ public File diskDirectorySelector(File file, boolean createTargetFile, U u) } // if there isn't an overlap, downgrade to storage balance(sequence) strategy. - return super.diskDirectorySelector(file, createTargetFile, u); + return super.diskDirectorySelector(sourceDirectory, fileName, createTargetFile, u); } @Override From 96bb36d79dc14a097dd7fb812ba1bae5b78c35f0 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Sun, 27 Apr 2025 10:37:20 +0800 Subject: [PATCH 08/10] update ActiveLoadUtil --- .../iotdb/db/storageengine/load/active/ActiveLoadUtil.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java index a4caf32f74d1..b338f9045c97 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java @@ -155,7 +155,8 @@ public static ILoadDiskSelector updateLoadDisLoad() { IoTDBDescriptor.getInstance().getConfig().getLoadDiskSelectStrategy(), IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs(), new ILoadDiskSelector.DiskDirectorySelector() { - private FolderManager folderManager; + + private volatile FolderManager folderManager; @Override public File selectDirectory( From 33ec71cc6ee6ce27a903459e86ab96364034a6c1 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 30 Apr 2025 16:22:41 +0800 Subject: [PATCH 09/10] fix --- .../org/apache/iotdb/db/conf/IoTDBConfig.java | 6 +++++- .../db/storageengine/dataregion/DataRegion.java | 16 +++++++++------- .../load/active/ActiveLoadUtil.java | 13 ++++++------- .../load/disk/ILoadDiskSelector.java | 13 +++++-------- .../InheritSystemMultiDisksStrategySelector.java | 4 ++-- .../storageengine/load/disk/MinIOSelector.java | 10 ++++------ 6 files changed, 31 insertions(+), 31 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 4673e9c66378..f601b93ab584 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -3872,7 +3872,11 @@ public void setLoadDiskSelectStrategy(String loadDiskSelectStrategy) { } public String getLoadDiskSelectStrategyForIoTV2AndPipe() { - return loadDiskSelectStrategyForIoTV2AndPipe; + return LoadDiskSelectorType.INHERIT_LOAD + .getValue() + .equals(loadDiskSelectStrategyForIoTV2AndPipe) + ? getLoadDiskSelectStrategy() + : loadDiskSelectStrategyForIoTV2AndPipe; } public void setLoadDiskSelectStrategyForIoTV2AndPipe( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 11f7068b1a1e..2425a564bd9a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.cluster.NodeStatus; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.conf.CommonDescriptor; +import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.exception.MetadataException; import org.apache.iotdb.commons.file.SystemFileFactory; @@ -425,14 +426,15 @@ public File selectDirectory( } }; + final String[] dirs = + Arrays.stream(config.getTierDataDirs()[0]) + .map(v -> fsFactory.getFile(v, IoTDBConstant.UNSEQUENCE_FOLDER_NAME).getPath()) + .toArray(String[]::new); ordinaryLoadDiskSelector = - ILoadDiskSelector.initDiskSelector( - config.getLoadDiskSelectStrategy(), config.getTierDataDirs()[0], selector); + ILoadDiskSelector.initDiskSelector(config.getLoadDiskSelectStrategy(), dirs, selector); pipeAndIoTV2LoadDiskSelector = ILoadDiskSelector.initDiskSelector( - config.getLoadDiskSelectStrategyForIoTV2AndPipe(), - config.getTierDataDirs()[0], - selector); + config.getLoadDiskSelectStrategyForIoTV2AndPipe(), dirs, selector); } @Override @@ -3110,9 +3112,9 @@ private boolean loadTsFileToUnSequence( + tsFileResource.getTsFile().getName(); final File targetFile = (tsFileResource.isGeneratedByPipeConsensus() || tsFileResource.isGeneratedByPipe()) - ? pipeAndIoTV2LoadDiskSelector.diskDirectorySelector( + ? pipeAndIoTV2LoadDiskSelector.selectTargetDirectory( tsFileToLoad.getParentFile(), fileName, true, targetTierLevel) - : ordinaryLoadDiskSelector.diskDirectorySelector( + : ordinaryLoadDiskSelector.selectTargetDirectory( tsFileToLoad.getParentFile(), fileName, true, targetTierLevel); tsFileResource.setFile(targetFile); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java index b338f9045c97..4bd226d0a6f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java @@ -74,7 +74,7 @@ private static boolean loadTsFilesToActiveDir( final File targetFilePath; try { targetFilePath = - loadDiskSelector.diskDirectorySelector(file.getParentFile(), file.getName(), false, null); + loadDiskSelector.selectTargetDirectory(file.getParentFile(), file.getName(), false, null); } catch (DiskSpaceInsufficientException e) { LOGGER.warn("Fail to load disk space of file {}", file.getAbsolutePath(), e); return false; @@ -110,7 +110,7 @@ public static boolean loadFilesToActiveDir( try { final File file = new File(files.get(0)); targetFilePath = - loadDiskSelector.diskDirectorySelector(file.getParentFile(), file.getName(), false, null); + loadDiskSelector.selectTargetDirectory(file.getParentFile(), file.getName(), false, null); } catch (DiskSpaceInsufficientException e) { LOGGER.warn("Fail to load disk space of file {}", files.get(0), e); return false; @@ -150,10 +150,11 @@ private static void loadTsFileAsyncToTargetDir( } public static ILoadDiskSelector updateLoadDisLoad() { + final String[] dirs = IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs(); ILoadDiskSelector loadDiskSelector = ILoadDiskSelector.initDiskSelector( IoTDBDescriptor.getInstance().getConfig().getLoadDiskSelectStrategy(), - IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs(), + dirs, new ILoadDiskSelector.DiskDirectorySelector() { private volatile FolderManager folderManager; @@ -172,12 +173,10 @@ private void initFolderManager() throws DiskSpaceInsufficientException { if (folderManager != null) { return; } - final String[] loadActiveListeningDirs = - IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs(); + folderManager = new FolderManager( - Arrays.asList(loadActiveListeningDirs), - DirectoryStrategyType.SEQUENCE_STRATEGY); + Arrays.asList(dirs), DirectoryStrategyType.SEQUENCE_STRATEGY); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java index 1be8b01cc131..814e8b7881fe 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/ILoadDiskSelector.java @@ -26,21 +26,18 @@ public interface ILoadDiskSelector { @FunctionalInterface - public interface DiskDirectorySelector { + interface DiskDirectorySelector { File selectDirectory(final File sourceDirectory, final String fileName, final U u) throws DiskSpaceInsufficientException; } - public File diskDirectorySelector( - final File sourceDirectory, - final String fileName, - final boolean isNeedCreateTargetFile, - final U u) + File selectTargetDirectory( + final File sourceDirectory, final String fileName, final boolean appendFileName, final U u) throws DiskSpaceInsufficientException; - public LoadDiskSelectorType getLoadDiskSelectorType(); + LoadDiskSelectorType getLoadDiskSelectorType(); - public static ILoadDiskSelector initDiskSelector( + static ILoadDiskSelector initDiskSelector( final String selectStrategy, final String[] dirs, final DiskDirectorySelector selector) { final ILoadDiskSelector diskSelector; switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue(selectStrategy)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java index 42b60d9d0618..1d6b4df55201 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/InheritSystemMultiDisksStrategySelector.java @@ -36,8 +36,8 @@ public InheritSystemMultiDisksStrategySelector(final DiskDirectorySelector se this.directorySelector = selector; } - public File diskDirectorySelector( - final File sourceDirectory, final String FileName, final boolean createTargetFile, final U u) + public File selectTargetDirectory( + final File sourceDirectory, final String FileName, final boolean appendFileName, final U u) throws DiskSpaceInsufficientException { return directorySelector.selectDirectory(sourceDirectory, FileName, u); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java index 0d810d17055e..95fb5b8f2e5f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/disk/MinIOSelector.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.load.disk; -import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.db.exception.DiskSpaceInsufficientException; import org.apache.iotdb.metrics.utils.FileStoreUtils; @@ -52,7 +51,6 @@ public MinIOSelector(final String[] dirs, final DiskDirectorySelector selecto this.rootDisks2DataDirsMapForLoad = new HashMap<>(dirs.length); Arrays.stream(dirs) .filter(Objects::nonNull) - .map(v -> fsFactory.getFile(v, IoTDBConstant.UNSEQUENCE_FOLDER_NAME).getPath()) .forEach( dataDirPath -> { File dataDirFile = new File(dataDirPath); @@ -74,8 +72,8 @@ public MinIOSelector(final String[] dirs, final DiskDirectorySelector selecto } @Override - public File diskDirectorySelector( - final File sourceDirectory, final String fileName, final boolean createTargetFile, final U u) + public File selectTargetDirectory( + final File sourceDirectory, final String fileName, final boolean appendFileName, final U u) throws DiskSpaceInsufficientException { String fileDirRoot = null; try { @@ -92,7 +90,7 @@ public File diskDirectorySelector( File targetFile = null; if (rootDisks2DataDirsMapForLoad.containsKey(fileDirRoot)) { - if (createTargetFile) { + if (appendFileName) { // if there is an overlap between firDirRoot and data directories' disk roots, try to get // targetFile in the same disk targetFile = fsFactory.getFile(rootDisks2DataDirsMapForLoad.get(fileDirRoot), fileName); @@ -104,7 +102,7 @@ public File diskDirectorySelector( } // if there isn't an overlap, downgrade to storage balance(sequence) strategy. - return super.diskDirectorySelector(sourceDirectory, fileName, createTargetFile, u); + return super.selectTargetDirectory(sourceDirectory, fileName, appendFileName, u); } @Override From 7490995672df8f6f06d89d67393542311cc7e119 Mon Sep 17 00:00:00 2001 From: luoluoyuyu Date: Wed, 30 Apr 2025 16:32:28 +0800 Subject: [PATCH 10/10] fix --- .../db/storageengine/load/active/ActiveLoadDirScanner.java | 2 +- .../iotdb/db/storageengine/load/active/ActiveLoadUtil.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java index 3a16d4ca0255..9ac840bf91d6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java @@ -178,7 +178,7 @@ private void hotReloadActiveLoadDirs() { listeningDirsConfig.set(IOTDB_CONFIG.getLoadActiveListeningDirs()); listeningDirs.addAll(Arrays.asList(IOTDB_CONFIG.getLoadActiveListeningDirs())); - ActiveLoadUtil.updateLoadDisLoad(); + ActiveLoadUtil.updateLoadDiskSelector(); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java index 4bd226d0a6f9..603c2bcc5085 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadUtil.java @@ -42,7 +42,7 @@ public class ActiveLoadUtil { private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadUtil.class); - private static volatile ILoadDiskSelector loadDiskSelector = updateLoadDisLoad(); + private static volatile ILoadDiskSelector loadDiskSelector = updateLoadDiskSelector(); public static boolean loadTsFileAsyncToActiveDir( final List tsFiles, final String dataBaseName, final boolean isDeleteAfterLoad) { @@ -149,7 +149,7 @@ private static void loadTsFileAsyncToTargetDir( }); } - public static ILoadDiskSelector updateLoadDisLoad() { + public static ILoadDiskSelector updateLoadDiskSelector() { final String[] dirs = IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs(); ILoadDiskSelector loadDiskSelector = ILoadDiskSelector.initDiskSelector(