Skip to content

Pipe/Load: Implement multi-disk awareness of multiple file systems during file copying and moving #15356

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@
import org.apache.iotdb.commons.pipe.receiver.IoTDBFileReceiver;
import org.apache.iotdb.commons.pipe.receiver.PipeReceiverStatusHandler;
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.conf.IoTDBConfig;
Expand Down Expand Up @@ -95,6 +93,7 @@
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
import org.apache.iotdb.db.tools.schema.SRStatementGenerator;
Expand All @@ -110,7 +109,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Paths;
Expand Down Expand Up @@ -546,35 +544,9 @@ protected TSStatus loadFileV2(

private TSStatus loadTsFileAsync(final String dataBaseName, final List<String> absolutePaths)
throws IOException {
final String loadActiveListeningPipeDir = IOTDB_CONFIG.getLoadActiveListeningPipeDir();
if (Objects.isNull(loadActiveListeningPipeDir)) {
if (!ActiveLoadUtil.loadFilesToActiveDir(dataBaseName, absolutePaths, true)) {
throw new PipeException("Load active listening pipe dir is not set.");
}

if (Objects.nonNull(dataBaseName)) {
final File targetDir = new File(loadActiveListeningPipeDir, dataBaseName);
return this.loadTsFileAsyncToTargetDir(targetDir, absolutePaths);
}

return loadTsFileAsyncToTargetDir(new File(loadActiveListeningPipeDir), absolutePaths);
}

private TSStatus loadTsFileAsyncToTargetDir(
final File targetDir, final List<String> absolutePaths) throws IOException {
for (final String absolutePath : absolutePaths) {
if (absolutePath == null) {
continue;
}
final File sourceFile = new File(absolutePath);
if (!Objects.equals(
targetDir.getAbsolutePath(), sourceFile.getParentFile().getAbsolutePath())) {
RetryUtils.retryOnException(
() -> {
FileUtils.moveFileWithMD5Check(sourceFile, targetDir);
return null;
});
}
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.auth.AuthException;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.utils.RetryUtils;
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
Expand Down Expand Up @@ -50,6 +49,7 @@
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
import org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
import org.apache.iotdb.db.utils.TimestampPrecisionUtils;
Expand Down Expand Up @@ -81,8 +81,6 @@
import java.util.Objects;
import java.util.Optional;

import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
import static org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED;
import static org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.validateDatabaseName;
import static org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
Expand Down Expand Up @@ -282,75 +280,20 @@ private boolean checkBeforeAnalyzeFileByFile(IAnalysis analysis) {
private boolean doAsyncLoad(final IAnalysis analysis) {
final long startTime = System.nanoTime();
try {
final String[] loadActiveListeningDirs =
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
String targetFilePath = null;
for (int i = 0, size = loadActiveListeningDirs == null ? 0 : loadActiveListeningDirs.length;
i < size;
i++) {
if (loadActiveListeningDirs[i] != null) {
targetFilePath = loadActiveListeningDirs[i];
break;
}
}
if (targetFilePath == null) {
LOGGER.warn("Load active listening dir is not set. Will try sync load instead.");
return false;
}

try {
if (Objects.nonNull(databaseForTableData)) {
loadTsFilesAsyncToTargetDir(new File(targetFilePath, databaseForTableData), tsFiles);
} else {
loadTsFilesAsyncToTargetDir(new File(targetFilePath), tsFiles);
}
} catch (Exception e) {
LOGGER.warn(
"Failed to async load tsfiles {} to target dir {}. Will try sync load instead.",
tsFiles,
targetFilePath,
e);
return false;
if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(
tsFiles, databaseForTableData, isDeleteAfterLoad)) {
analysis.setFinishQueryAfterAnalyze(true);
setRealStatement(analysis);
return true;
}

analysis.setFinishQueryAfterAnalyze(true);
setRealStatement(analysis);
return true;
LOGGER.info("Async Load has failed, and is now trying to load sync");
return false;
} finally {
LoadTsFileCostMetricsSet.getInstance()
.recordPhaseTimeCost(ANALYSIS_ASYNC_MOVE, System.nanoTime() - startTime);
}
}

private void loadTsFilesAsyncToTargetDir(final File targetDir, final List<File> files)
throws IOException {
for (final File file : files) {
if (file == null) {
continue;
}

loadTsFileAsyncToTargetDir(targetDir, file);
loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() + ".resource"));
loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() + ".mods"));
}
}

private void loadTsFileAsyncToTargetDir(final File targetDir, final File file)
throws IOException {
if (!file.exists()) {
return;
}
RetryUtils.retryOnException(
() -> {
if (isDeleteAfterLoad) {
moveFileWithMD5Check(file, targetDir);
} else {
copyFileWithMD5Check(file, targetDir);
}
return null;
});
}

private boolean doAnalyzeFileByFile(IAnalysis analysis) {
// analyze tsfile metadata file by file
for (int i = 0, tsfileNum = tsFiles.size(); i < tsfileNum; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,6 @@
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALFlushListener;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.listener.WALRecoverListener;
import org.apache.iotdb.db.storageengine.load.disk.ILoadDiskSelector;
import org.apache.iotdb.db.storageengine.load.disk.InheritSystemMultiDisksStrategySelector;
import org.apache.iotdb.db.storageengine.load.disk.MinIOSelector;
import org.apache.iotdb.db.storageengine.load.limiter.LoadTsFileRateLimiter;
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo;
Expand Down Expand Up @@ -330,8 +328,8 @@ public class DataRegion implements IDataRegionForQuery {

private final DataRegionMetrics metrics;

private ILoadDiskSelector ordinaryLoadDiskSelector;
private ILoadDiskSelector pipeAndIoTV2LoadDiskSelector;
private ILoadDiskSelector<Integer> ordinaryLoadDiskSelector;
private ILoadDiskSelector<Integer> pipeAndIoTV2LoadDiskSelector;

/**
* Construct a database processor.
Expand Down Expand Up @@ -416,27 +414,25 @@ public DataRegion(String databaseName, String id) {
}

private void initDiskSelector() {
switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue(config.getLoadDiskSelectStrategy())) {
case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY:
ordinaryLoadDiskSelector = new InheritSystemMultiDisksStrategySelector();
break;
case MIN_IO_FIRST:
default:
ordinaryLoadDiskSelector = new MinIOSelector();
}
final ILoadDiskSelector.DiskDirectorySelector<Integer> selector =
new ILoadDiskSelector.DiskDirectorySelector<Integer>() {
@Override
public File selectDirectory(
final File sourceDirectory, final String fileName, final Integer level)
throws DiskSpaceInsufficientException {
return fsFactory.getFile(
TierManager.getInstance().getNextFolderForTsFile(level, false), fileName);
}
};

switch (ILoadDiskSelector.LoadDiskSelectorType.fromValue(
config.getLoadDiskSelectStrategyForIoTV2AndPipe())) {
case MIN_IO_FIRST:
pipeAndIoTV2LoadDiskSelector = new MinIOSelector();
break;
case INHERIT_SYSTEM_MULTI_DISKS_SELECT_STRATEGY:
pipeAndIoTV2LoadDiskSelector = new InheritSystemMultiDisksStrategySelector();
break;
case INHERIT_LOAD:
default:
pipeAndIoTV2LoadDiskSelector = ordinaryLoadDiskSelector;
}
ordinaryLoadDiskSelector =
ILoadDiskSelector.initDiskSelector(
config.getLoadDiskSelectStrategy(), config.getTierDataDirs()[0], selector);
pipeAndIoTV2LoadDiskSelector =
ILoadDiskSelector.initDiskSelector(
config.getLoadDiskSelectStrategyForIoTV2AndPipe(),
config.getTierDataDirs()[0],
selector);
}

@Override
Expand Down Expand Up @@ -3102,22 +3098,20 @@ private boolean loadTsFileToUnSequence(
boolean isGeneratedByPipe)
throws LoadFileException, DiskSpaceInsufficientException {
final int targetTierLevel = 0;
final String fileName =
databaseName
+ File.separatorChar
+ dataRegionId
+ File.separatorChar
+ filePartitionId
+ File.separator
+ tsFileResource.getTsFile().getName();
final File targetFile =
(tsFileResource.isGeneratedByPipeConsensus() || tsFileResource.isGeneratedByPipe())
? pipeAndIoTV2LoadDiskSelector.getTargetFile(
tsFileToLoad,
databaseName,
dataRegionId,
filePartitionId,
tsFileResource.getTsFile().getName(),
targetTierLevel)
: ordinaryLoadDiskSelector.getTargetFile(
tsFileToLoad,
databaseName,
dataRegionId,
filePartitionId,
tsFileResource.getTsFile().getName(),
targetTierLevel);
? pipeAndIoTV2LoadDiskSelector.diskDirectorySelector(
tsFileToLoad.getParentFile(), fileName, true, targetTierLevel)
: ordinaryLoadDiskSelector.diskDirectorySelector(
tsFileToLoad.getParentFile(), fileName, true, targetTierLevel);

tsFileResource.setFile(targetFile);
if (tsFileManager.contains(tsFileResource, false)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ private void hotReloadActiveLoadDirs() {

listeningDirsConfig.set(IOTDB_CONFIG.getLoadActiveListeningDirs());
listeningDirs.addAll(Arrays.asList(IOTDB_CONFIG.getLoadActiveListeningDirs()));
ActiveLoadUtil.updateLoadDisLoad();
}
}
}
Expand Down
Loading