diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml
index 72fd2f79348e..dbe23d292d65 100644
--- a/dolphinscheduler-bom/pom.xml
+++ b/dolphinscheduler-bom/pom.xml
@@ -131,6 +131,7 @@
2.11.0
2.4.1
1.0.1
+ 1.17
@@ -973,6 +974,12 @@
${credentials-java.version}
+
+ org.zeroturnaround
+ zt-zip
+ ${zt-zip.version}
+
+
org.testcontainers
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
index 499ed0cd5cf5..825b819c9f2b 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java
@@ -391,4 +391,11 @@ public final class Constants {
public static final String RELEASE_STATE = "releaseState";
public static final String EXECUTE_TYPE = "executeType";
+ /**
+ * File parameter transfer
+ */
+ public static final String CRC_SUFFIX = ".crc";
+ public static final String DOWNLOAD_TMP = ".DT_TMP";
+ public static final String PACK_SUFFIX = "_ds_pack.zip";
+ public static final String RESOURCE_TAG = "DATA_TRANSFER";
}
diff --git a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
index d9a4226f2971..a55f2d9f9831 100644
--- a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
+++ b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/DateUtils.java
@@ -29,6 +29,7 @@
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Date;
+import java.util.Objects;
import java.util.TimeZone;
import javax.annotation.Nonnull;
@@ -700,4 +701,14 @@ public static String getTimestampString() {
return String.valueOf(System.currentTimeMillis());
}
+ /**
+ * @param timeMillis timeMillis like System.currentTimeMillis()
+ * @param dateTimeFormatter expect formatter, like yyyy-MM-dd HH:mm:ss
+ * @return formatted string
+ */
+ public static String formatTimeStamp(long timeMillis, DateTimeFormatter dateTimeFormatter) {
+ Objects.requireNonNull(dateTimeFormatter);
+ return dateTimeFormatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeMillis),
+ ZoneId.systemDefault()));
+ }
}
diff --git a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java
index 8a68b7579f97..fb77c615517a 100644
--- a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java
+++ b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImpl.java
@@ -294,7 +294,15 @@ private Map parseLocalParamsMap(AbstractParameters parameters)
Map localParametersMaps = new LinkedHashMap<>();
if (CollectionUtils.isNotEmpty(parameters.getLocalParams())) {
parameters.getLocalParams()
- .forEach(localParam -> localParametersMaps.put(localParam.getProp(), localParam));
+ .forEach(localParam -> {
+ // If the parameter type is FILE and the direction is IN,
+ // then use the parameter value as map key.
+ if (DataType.FILE.equals(localParam.getType()) && Direct.IN.equals(localParam.getDirect())) {
+ localParametersMaps.put(localParam.getValue(), localParam);
+ } else {
+ localParametersMaps.put(localParam.getProp(), localParam);
+ }
+ });
}
return localParametersMaps;
}
diff --git a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java
index 1f0beb349eee..31cb0c12bbf4 100644
--- a/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java
+++ b/dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java
@@ -232,6 +232,60 @@ public void testParamParsingPreparation() {
String.valueOf(workflowDefinition.getCode()));
}
+ @Test
+ public void testParamParsingPreparation_withLocalFileParams() {
+ TaskInstance taskInstance = new TaskInstance();
+ taskInstance.setId(1);
+ taskInstance.setExecutePath("home/path/execute");
+
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setName("TaskName-1");
+ taskDefinition.setCode(1000001L);
+
+ WorkflowInstance workflowInstance = new WorkflowInstance();
+ workflowInstance.setId(2);
+ final BackfillWorkflowCommandParam backfillWorkflowCommandParam = BackfillWorkflowCommandParam.builder()
+ .timeZone("Asia/Shanghai")
+ .build();
+ workflowInstance.setHistoryCmd(CommandType.COMPLEMENT_DATA.toString());
+ workflowInstance.setCommandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam));
+ workflowInstance.setGlobalParams("");
+
+ WorkflowDefinition workflowDefinition = new WorkflowDefinition();
+ workflowDefinition.setName("ProcessName-1");
+ workflowDefinition.setProjectName("ProjectName");
+ workflowDefinition.setProjectCode(3000001L);
+ workflowDefinition.setCode(200001L);
+
+ Project project = new Project();
+ project.setName("ProjectName");
+ project.setCode(3000001L);
+
+ workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode());
+ workflowInstance.setProjectCode(workflowDefinition.getProjectCode());
+
+ String varPoolParams = "[{\"prop\":\"task1.file\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"file\"}]";
+ taskInstance.setTaskCode(taskDefinition.getCode());
+ taskInstance.setTaskDefinitionVersion(taskDefinition.getVersion());
+ taskInstance.setProjectCode(workflowDefinition.getProjectCode());
+ taskInstance.setWorkflowInstanceId(workflowInstance.getId());
+ taskInstance.setVarPool(varPoolParams);
+
+ Property fileInParam = new Property("file_new", Direct.IN, DataType.FILE, "task1.file");
+ AbstractParameters parameters = Mockito.mock(AbstractParameters.class);
+ Mockito.when(parameters.getLocalParams()).thenReturn(Lists.newArrayList(fileInParam));
+
+ Mockito.when(projectParameterMapper.queryByProjectCode(Mockito.anyLong())).thenReturn(Collections.emptyList());
+
+ Map propertyMap =
+ curingParamsServiceImpl.paramParsingPreparation(taskInstance, parameters, workflowInstance,
+ project.getName(), workflowDefinition.getName());
+ Assertions.assertNotNull(propertyMap);
+
+ Assertions.assertEquals("file", propertyMap.get("task1.file").getValue());
+ Assertions.assertEquals("file_new", propertyMap.get("task1.file").getProp());
+ }
+
@Test
public void testParseGlobalParamsMap() throws Exception {
WorkflowInstance workflowInstance = new WorkflowInstance();
diff --git a/dolphinscheduler-worker/pom.xml b/dolphinscheduler-worker/pom.xml
index efadb5508af7..fa415ee9ab30 100644
--- a/dolphinscheduler-worker/pom.xml
+++ b/dolphinscheduler-worker/pom.xml
@@ -217,6 +217,10 @@
dolphinscheduler-actuator-authentication
+
+ org.zeroturnaround
+ zt-zip
+
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java
index 7118a099d5f9..e767da743ded 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutor.java
@@ -23,11 +23,14 @@
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogMarkers;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils;
+import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils;
import org.apache.dolphinscheduler.server.worker.utils.TenantUtils;
import org.apache.dolphinscheduler.task.executor.AbstractTaskExecutor;
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
@@ -36,6 +39,7 @@
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent;
import java.util.ArrayList;
+import java.util.List;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -71,6 +75,22 @@ protected void initializeTaskPlugin() {
log.info("Set taskVarPool: {} successfully", taskExecutionContext.getVarPool());
}
+ @Override
+ public TaskExecutorState trackTaskExecutorState() {
+ TaskExecutorState state = super.trackTaskExecutorState();
+ if (TaskExecutorState.SUCCEEDED.equals(state)) {
+ uploadOutputFilesIfNeeded();
+ }
+ return state;
+ }
+
+ private void uploadOutputFilesIfNeeded() {
+ List uploadOutputFiles = TaskFilesTransferUtils.tryUploadOutputFiles(
+ taskExecutionContext,
+ storageOperator);
+ log.info("Upload output files successfully: {}", uploadOutputFiles);
+ }
+
@Override
protected void doTriggerTaskPlugin() {
final ITaskExecutor taskExecutor = this;
@@ -130,13 +150,20 @@ protected void initializeTaskContext() {
TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext);
log.info("TaskInstance working directory: {} create successfully", taskExecutionContext.getExecutePath());
+ TaskChannel taskChannel = physicalTaskPluginFactory.getTaskChannel(this);
final ResourceContext resourceContext = TaskExecutionContextUtils.downloadResourcesIfNeeded(
- physicalTaskPluginFactory.getTaskChannel(this),
+ taskChannel,
storageOperator,
taskExecutionContext);
taskExecutionContext.setResourceContext(resourceContext);
log.info("Download resources successfully: {}", taskExecutionContext.getResourceContext());
+ List downloadUpstreamFiles = TaskFilesTransferUtils.tryDownloadUpstreamFiles(
+ taskChannel,
+ taskExecutionContext,
+ storageOperator);
+ log.info("Download upstream files successfully: {}", downloadUpstreamFiles);
+
log.info(TaskLogMarkers.excludeInTaskLog(), "Initialized Task Context{}",
JSONUtils.toPrettyJsonString(taskExecutionContext));
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
new file mode 100644
index 000000000000..6c6969f57e43
--- /dev/null
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtils.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.FileUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskException;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.zeroturnaround.zip.ZipUtil;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+@Slf4j
+public final class TaskFilesTransferUtils {
+
+ private TaskFilesTransferUtils() {
+ throw new UnsupportedOperationException("Utility class");
+ }
+
+ /**
+ * Upload the task's output FILE parameter files to the resource center.
+ *
+ * @param context task execution context
+ * @param storageOperator resource storage operator
+ * @return a list of uploaded output file properties
+ */
+ public static List tryUploadOutputFiles(TaskExecutionContext context,
+ StorageOperator storageOperator) {
+ List outFileParams = getOutFileLocalParams(context);
+ if (outFileParams.isEmpty()) {
+ log.debug("No output files to upload");
+ return outFileParams;
+ }
+
+ List varPool = context.getVarPool();
+ // get map of varPools for quick search
+ Map varPoolsMap = varPool.stream()
+ .filter(property -> Direct.OUT.equals(property.getDirect()))
+ .collect(Collectors.toMap(Property::getProp, x -> x));
+
+ for (Property outProp : outFileParams) {
+ String localPath = context.getExecutePath() + File.separator + outProp.getValue();
+ String srcPath = packIfDirectory(localPath);
+ String srcCrcPath = srcPath + Constants.CRC_SUFFIX;
+
+ // Generate CRC file
+ try {
+ FileUtils.writeContent2File(FileUtils.getFileChecksum(localPath), srcCrcPath);
+ } catch (IOException e) {
+ throw new TaskException("Generate CRC file failed: " + srcCrcPath, e);
+ }
+
+ // Build resource path
+ String fileName = new File(srcPath).getName();
+ String resourcePath = buildResourcePath(context, fileName);
+ String resourceCrcPath = resourcePath + Constants.CRC_SUFFIX;
+
+ // Upload file and CRC file
+ try {
+ String resourceFullPath = storageOperator.getStorageFileAbsolutePath(
+ context.getTenantCode(),
+ resourcePath);
+ String resourceCrcFullPath = storageOperator.getStorageFileAbsolutePath(
+ context.getTenantCode(),
+ resourceCrcPath);
+ log.info("Upload {} -> {}", srcPath, resourceFullPath);
+ storageOperator.upload(srcPath, resourceFullPath, false, true);
+ log.info("Upload CRC {} -> {}", srcCrcPath, resourceCrcFullPath);
+ storageOperator.upload(srcCrcPath, resourceCrcFullPath, false, true);
+ } catch (Exception e) {
+ throw new TaskException("Upload file to storage failed: " + srcPath, e);
+ }
+
+ // update varPool
+ Property oriProperty;
+ // if the property is not in varPool, add it
+ if (varPoolsMap.containsKey(outProp.getProp())) {
+ oriProperty = varPoolsMap.get(outProp.getProp());
+ } else {
+ oriProperty = new Property(outProp.getProp(), Direct.OUT, DataType.FILE, outProp.getValue());
+ varPool.add(oriProperty);
+ }
+ oriProperty.setProp(String.format("%s.%s", context.getTaskName(), oriProperty.getProp()));
+ oriProperty.setValue(resourcePath);
+ }
+
+ return outFileParams;
+ }
+
+ /**
+ * Download upstream task's FILE parameter files from resource center.
+ *
+ * @param taskChannel task channel used to parse task parameters
+ * @param context task execution context, used to get local working directory and other information
+ * @param storageOperator resource storage operator
+ * @return a list of download input file properties
+ */
+ public static List tryDownloadUpstreamFiles(TaskChannel taskChannel,
+ TaskExecutionContext context,
+ StorageOperator storageOperator) {
+ AbstractParameters abstractParameters = taskChannel.parseParameters(context.getTaskParams());
+
+ List inFileParams = abstractParameters.getLocalParams()
+ .stream()
+ .filter(prop -> prop.getDirect().equals(Direct.IN) && prop.getType().equals(DataType.FILE))
+ .collect(Collectors.toList());
+
+ if (inFileParams.isEmpty()) {
+ log.debug("No input files to download");
+ return inFileParams;
+ }
+
+ Map prepareParamsMap = context.getPrepareParamsMap();
+
+ String executePath = context.getExecutePath();
+ String downloadTmpPath = executePath + File.separator + Constants.DOWNLOAD_TMP;
+
+ for (Property fileProp : inFileParams) {
+ // Build local path
+ String paramName = fileProp.getProp();
+ String fileParamVarPoolKey = fileProp.getValue();
+ Property prepareParam = prepareParamsMap.get(fileParamVarPoolKey);
+ if (prepareParam == null) {
+ log.error("{} not in {}", paramName, prepareParamsMap.keySet());
+ throw new TaskException(String.format("Can not find upstream file using %s, please check the key",
+ fileParamVarPoolKey));
+ }
+
+ String resourcePath = prepareParam.getValue();
+ String localTargetPath = executePath + File.separator + paramName;
+
+ // Build resource path
+ boolean isPack = resourcePath.endsWith(Constants.PACK_SUFFIX);
+ String downloadPath = isPack
+ ? downloadTmpPath + File.separator + new File(resourcePath).getName()
+ : localTargetPath;
+
+ // Download resource file
+ String resourceFullPath = storageOperator.getStorageFileAbsolutePath(context.getTenantCode(), resourcePath);
+ log.info("Download {} -> {}", resourceFullPath, downloadPath);
+ storageOperator.download(resourceFullPath, downloadPath, true);
+
+ // If it is a packaged file, unpack to target directory
+ if (isPack) {
+ File packFile = new File(downloadPath);
+ log.info("Unpack {} to {}", downloadPath, localTargetPath);
+ ZipUtil.unpack(packFile, new File(localTargetPath));
+ }
+ }
+
+ // Clean up temporary directory
+ try {
+ org.apache.commons.io.FileUtils.deleteDirectory(new File(downloadTmpPath));
+ } catch (IOException e) {
+ log.warn("Delete temp directory {} failed, ignored.", downloadTmpPath, e);
+ }
+
+ return inFileParams;
+ }
+
+ public static List getOutFileLocalParams(TaskExecutionContext taskExecutionContext) {
+ List localParamsProperty = new ArrayList<>();
+ JsonNode taskParams = JSONUtils.parseObject(taskExecutionContext.getTaskParams());
+ for (JsonNode localParam : taskParams.get("localParams")) {
+ Property property = JSONUtils.parseObject(localParam.toString(), Property.class);
+
+ if (property != null && property.getDirect().equals(Direct.OUT)
+ && property.getType().equals(DataType.FILE)) {
+ localParamsProperty.add(property);
+ }
+ }
+ return localParamsProperty;
+ }
+
+ public static String packIfDirectory(String path) {
+ File file = new File(path);
+ if (!file.exists()) {
+ throw new TaskException("File not found: " + path);
+ }
+ if (file.isDirectory()) {
+ String zipPath = file.getPath() + Constants.PACK_SUFFIX;
+ ZipUtil.pack(file, new File(zipPath));
+ return zipPath;
+ }
+ return path;
+ }
+
+ public static String buildResourcePath(TaskExecutionContext context, String fileName) {
+ String date = DateUtils.formatTimeStamp(context.getStartTime(), DateTimeFormatter.ofPattern("yyyyMMdd"));
+ String folder = String.format("%s/%s/%d/%d_%d",
+ Constants.RESOURCE_TAG, date,
+ context.getWorkflowDefinitionCode(),
+ context.getWorkflowDefinitionVersion(),
+ context.getWorkflowInstanceId());
+ String safeTaskName = context.getTaskName().replace(" ", "_");
+ return String.format("%s/%s_%s_%s", folder, safeTaskName, context.getTaskInstanceId(), fileName);
+ }
+}
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorTest.java
new file mode 100644
index 000000000000..1c9f7cc25d9a
--- /dev/null
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.executor;
+
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
+import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils;
+import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils;
+import org.apache.dolphinscheduler.server.worker.utils.TenantUtils;
+import org.apache.dolphinscheduler.task.executor.TaskExecutorState;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class PhysicalTaskExecutorTest {
+
+ private final String tenantCode = "ubuntu";
+
+ @Mock
+ private StorageOperator storageOperator;
+ @Mock
+ private PhysicalTaskPluginFactory pluginFactory;
+ @Mock
+ private TaskChannel taskChannel;
+
+ private PhysicalTaskExecutor executor;
+ private TaskExecutionContext context;
+
+ @BeforeEach
+ void setUp() {
+ long processDefineCode = 123;
+ String taskName = "test";
+ int processDefineVersion = 456;
+ int processInstanceId = 678;
+ int taskInstanceId = 789;
+ context = TaskExecutionContext.builder()
+ .workflowInstanceId(processInstanceId)
+ .workflowDefinitionVersion(processDefineVersion)
+ .workflowDefinitionCode(processDefineCode)
+ .taskInstanceId(taskInstanceId)
+ .taskName(taskName)
+ .tenantCode(tenantCode)
+ .startTime(System.currentTimeMillis())
+ .build();
+ PhysicalTaskExecutorBuilder physicalTaskExecutorBuilder = PhysicalTaskExecutorBuilder.builder()
+ .taskExecutionContext(context)
+ .storageOperator(storageOperator)
+ .physicalTaskPluginFactory(pluginFactory)
+ .build();
+ executor = new PhysicalTaskExecutor(physicalTaskExecutorBuilder);
+ }
+
+ @Test
+ void testInitializeTaskContext_shouldDownloadUpstreamFiles() {
+ try (
+ MockedStatic tenantUtilsMock = Mockito.mockStatic(TenantUtils.class);
+ MockedStatic contextUtilsMock =
+ Mockito.mockStatic(TaskExecutionContextUtils.class);
+ MockedStatic transferUtilsMock =
+ Mockito.mockStatic(TaskFilesTransferUtils.class)) {
+
+ tenantUtilsMock.when(() -> TenantUtils.getOrCreateActualTenant(Mockito.any(), Mockito.any()))
+ .thenReturn(tenantCode);
+ contextUtilsMock
+ .when(() -> TaskExecutionContextUtils.downloadResourcesIfNeeded(Mockito.any(), Mockito.any(),
+ Mockito.any()))
+ .thenReturn(new ResourceContext());
+
+ Mockito.when(pluginFactory.getTaskChannel(Mockito.any(PhysicalTaskExecutor.class))).thenReturn(taskChannel);
+
+ List downloadedFiles = new ArrayList<>();
+ transferUtilsMock.when(() -> TaskFilesTransferUtils.tryDownloadUpstreamFiles(
+ Mockito.eq(taskChannel), Mockito.eq(context), Mockito.eq(storageOperator)))
+ .thenReturn(downloadedFiles);
+
+ executor.initializeTaskContext();
+
+ transferUtilsMock.verify(() -> TaskFilesTransferUtils.tryDownloadUpstreamFiles(
+ Mockito.eq(taskChannel), Mockito.eq(context), Mockito.eq(storageOperator)), Mockito.times(1));
+ }
+ }
+
+ @Test
+ void testTrackTaskExecutorState_whenTaskSucceeded_shouldUploadOutputFiles() {
+ try (
+ MockedStatic transferUtilsMock =
+ Mockito.mockStatic(TaskFilesTransferUtils.class)) {
+ List uploaded = new ArrayList<>();
+ transferUtilsMock.when(() -> TaskFilesTransferUtils.tryUploadOutputFiles(
+ Mockito.eq(context), Mockito.eq(storageOperator))).thenReturn(uploaded);
+
+ PhysicalTaskExecutor spyExecutor = Mockito.spy(executor);
+ Mockito.doReturn(TaskExecutorState.SUCCEEDED).when(spyExecutor).doTrackTaskPluginStatus();
+
+ spyExecutor.trackTaskExecutorState();
+
+ transferUtilsMock.verify(() -> TaskFilesTransferUtils.tryUploadOutputFiles(
+ Mockito.eq(context), Mockito.eq(storageOperator)), Mockito.times(1));
+ }
+ }
+
+ @Test
+ void testTrackTaskExecutorState_whenTaskFailed_shouldNotUploadOutputFiles() {
+ try (
+ MockedStatic transferUtilsMock =
+ Mockito.mockStatic(TaskFilesTransferUtils.class)) {
+ PhysicalTaskExecutor spyExecutor = Mockito.spy(executor);
+ Mockito.doReturn(TaskExecutorState.FAILED).when(spyExecutor).doTrackTaskPluginStatus();
+
+ spyExecutor.trackTaskExecutorState();
+
+ transferUtilsMock.verify(() -> TaskFilesTransferUtils.tryUploadOutputFiles(
+ Mockito.any(), Mockito.any()), Mockito.never());
+ }
+ }
+}
diff --git a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java
new file mode 100644
index 000000000000..855003cc0921
--- /dev/null
+++ b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/utils/TaskFilesTransferUtilsTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.utils;
+
+import org.apache.dolphinscheduler.common.constants.Constants;
+import org.apache.dolphinscheduler.common.utils.DateUtils;
+import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator;
+import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
+import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
+import org.apache.dolphinscheduler.plugin.task.api.model.Property;
+import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
+
+import org.apache.curator.shaded.com.google.common.io.Files;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+import org.zeroturnaround.zip.ZipUtil;
+
+public class TaskFilesTransferUtilsTest {
+
+ private final long processDefineCode = 123;
+ private final int processDefineVersion = 456;
+ private final int processInstanceId = 678;
+ private final int taskInstanceId = 789;
+ private final String taskName = "test";
+
+ private final String tenantCode = "ubuntu";
+
+ private long startTime;
+
+ private String exceptTemplate;
+
+ @BeforeEach
+ void init() {
+ startTime = System.currentTimeMillis();
+ String date = DateUtils.formatTimeStamp(startTime, DateTimeFormatter.ofPattern("yyyyMMdd"));
+ exceptTemplate = String.format("%s/%s/%d/%d_%d/%s_%d",
+ Constants.RESOURCE_TAG,
+ date,
+ processDefineCode,
+ processDefineVersion,
+ processInstanceId,
+ taskName,
+ taskInstanceId);
+ }
+
+ @Test
+ void testTryUploadOutputFiles() throws IOException {
+ File executePath = Files.createTempDir();
+ File folderPath = new File(executePath, "data");
+ File file = new File(folderPath.getPath() + "/test.txt");
+ if (!(folderPath.mkdirs() && file.createNewFile())) {
+ return;
+ }
+ String fileParamValue = String.format("%s/%s", folderPath.getName(), file.getName());
+ String params = "[" +
+ String.format("{\"prop\":\"folder\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"%s\"},",
+ folderPath.getName())
+ +
+ String.format(" {\"prop\":\"file\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"%s\"},",
+ fileParamValue)
+ +
+ "{\"prop\":\"a\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"a\"}," +
+ "{\"prop\":\"b\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"b\"}" +
+ "]";
+ String taskParams = String.format("{\"localParams\": %s}", params);
+ List oriVarPool = new ArrayList<>(4);
+ oriVarPool.add(new Property("folder", Direct.OUT, DataType.FILE, folderPath.getName()));
+ oriVarPool.add(new Property("a", Direct.OUT, DataType.VARCHAR, "a"));
+ oriVarPool.add(new Property("b", Direct.OUT, DataType.VARCHAR, "b"));
+ List varPool = new ArrayList<>(oriVarPool);
+ TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder()
+ .taskParams(taskParams)
+ .workflowInstanceId(processInstanceId)
+ .workflowDefinitionVersion(processDefineVersion)
+ .workflowDefinitionCode(processDefineCode)
+ .taskInstanceId(taskInstanceId)
+ .taskName(taskName)
+ .tenantCode(tenantCode)
+ .executePath(executePath.toString())
+ .startTime(startTime)
+ .varPool(varPool)
+ .build();
+
+ StorageOperator storageOperator = Mockito.mock(StorageOperator.class);
+ List uploadOutputFiles =
+ TaskFilesTransferUtils.tryUploadOutputFiles(taskExecutionContext, storageOperator);
+
+ Assertions.assertNotNull(uploadOutputFiles);
+ Assertions.assertEquals(2, uploadOutputFiles.size());
+
+ Assertions.assertEquals("folder", uploadOutputFiles.get(0).getProp());
+ Assertions.assertEquals(folderPath.getName(), uploadOutputFiles.get(0).getValue());
+
+ Assertions.assertEquals("file", uploadOutputFiles.get(1).getProp());
+ Assertions.assertEquals(fileParamValue, uploadOutputFiles.get(1).getValue());
+
+ String exceptFolder =
+ String.format("%s_%s", exceptTemplate, folderPath.getName() + Constants.PACK_SUFFIX);
+ String exceptFile = String.format("%s_%s", exceptTemplate, file.getName());
+ List contextVarPool = taskExecutionContext.getVarPool();
+ Assertions.assertEquals(4, contextVarPool.size());
+
+ Assertions.assertEquals(String.format("%s.%s", taskName, "folder"), contextVarPool.get(0).getProp());
+ Assertions.assertEquals(exceptFolder, contextVarPool.get(0).getValue());
+
+ Assertions.assertEquals(oriVarPool.get(1).getProp(), contextVarPool.get(1).getProp());
+ Assertions.assertEquals(oriVarPool.get(2).getValue(), contextVarPool.get(2).getValue());
+
+ Assertions.assertEquals(String.format("%s.%s", taskName, "file"), contextVarPool.get(3).getProp());
+ Assertions.assertEquals(exceptFile, contextVarPool.get(3).getValue());
+ }
+
+ @Test
+ void testTryDownloadUpstreamFiles(@TempDir Path tempDir) {
+ File executePath = tempDir.toFile();
+ String folderPath = exceptTemplate + "_folder" + Constants.PACK_SUFFIX;
+ String filePath = exceptTemplate + "_file";
+
+ Map prepareParamsMap = new HashMap<>();
+ prepareParamsMap.put("task1.folder", new Property("folder", Direct.IN, DataType.FILE, folderPath));
+ prepareParamsMap.put("task2.file", new Property("file", Direct.IN, DataType.FILE, filePath));
+ prepareParamsMap.put("a", new Property("a", Direct.OUT, DataType.VARCHAR, "a"));
+ String localParams = "[" +
+ "{\"prop\":\"folder\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task1.folder\"}," +
+ " {\"prop\":\"file\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task2.file\"}," +
+ " {\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"a\"}" +
+ "]";
+ String taskParams = String.format("{\"localParams\": %s}", localParams);
+ TaskExecutionContext taskExecutionContext = TaskExecutionContext.builder()
+ .prepareParamsMap(prepareParamsMap)
+ .taskParams(taskParams)
+ .workflowInstanceId(processInstanceId)
+ .workflowDefinitionVersion(processDefineVersion)
+ .workflowDefinitionCode(processDefineCode)
+ .taskInstanceId(taskInstanceId)
+ .taskName(taskName)
+ .tenantCode(tenantCode)
+ .executePath(executePath.toString())
+ .startTime(startTime)
+ .build();
+
+ StorageOperator storageOperator = Mockito.mock(StorageOperator.class);
+ Mockito.mockStatic(ZipUtil.class);
+ TaskChannel taskChannel = Mockito.mock(TaskChannel.class);
+ AbstractParameters abstractParameters = Mockito.mock(AbstractParameters.class);
+ Mockito.when(abstractParameters.getLocalParams()).thenReturn(JSONUtils.toList(localParams, Property.class));
+ Mockito.when(taskChannel.parseParameters(Mockito.anyString())).thenReturn(abstractParameters);
+
+ List downloadUpstreamFiles = TaskFilesTransferUtils.tryDownloadUpstreamFiles(
+ taskChannel,
+ taskExecutionContext,
+ storageOperator);
+
+ Assertions.assertNotNull(downloadUpstreamFiles);
+ Assertions.assertEquals(2, downloadUpstreamFiles.size());
+
+ Assertions.assertEquals("folder", downloadUpstreamFiles.get(0).getProp());
+ Assertions.assertEquals("task1.folder", downloadUpstreamFiles.get(0).getValue());
+
+ Assertions.assertEquals("file", downloadUpstreamFiles.get(1).getProp());
+ Assertions.assertEquals("task2.file", downloadUpstreamFiles.get(1).getValue());
+ }
+
+ @Test
+ void testGetOutFileLocalParams() {
+ String taskParams = "{\"localParams\":[" +
+ "{\"prop\":\"inputFile\",\"direct\":\"IN\",\"type\":\"FILE\",\"value\":\"task1.data\"}," +
+ "{\"prop\":\"outputFile\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"data\"}," +
+ "{\"prop\":\"a\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"a\"}," +
+ "{\"prop\":\"b\",\"direct\":\"OUT\",\"type\":\"VARCHAR\",\"value\":\"b\"}" +
+ "]}";
+ TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
+ Mockito.when(taskExecutionContext.getTaskParams()).thenReturn(taskParams);
+
+ List fileLocalParamsOut = TaskFilesTransferUtils.getOutFileLocalParams(taskExecutionContext);
+ Assertions.assertEquals(1, fileLocalParamsOut.size());
+ Assertions.assertEquals("outputFile", fileLocalParamsOut.get(0).getProp());
+ Assertions.assertEquals("data", fileLocalParamsOut.get(0).getValue());
+ }
+
+ @Test
+ void testGetResourcePath() {
+ String fileName = "test.txt";
+ TaskExecutionContext taskExecutionContext = Mockito.mock(TaskExecutionContext.class);
+
+ Mockito.when(taskExecutionContext.getStartTime()).thenReturn(startTime);
+
+ Mockito.when(taskExecutionContext.getWorkflowDefinitionCode()).thenReturn(processDefineCode);
+ Mockito.when(taskExecutionContext.getWorkflowDefinitionVersion()).thenReturn(processDefineVersion);
+ Mockito.when(taskExecutionContext.getWorkflowInstanceId()).thenReturn(processInstanceId);
+ Mockito.when(taskExecutionContext.getTaskInstanceId()).thenReturn(taskInstanceId);
+ Mockito.when(taskExecutionContext.getTaskName()).thenReturn(taskName);
+
+ String except = String.format("%s_%s", exceptTemplate, fileName);
+ Assertions.assertEquals(except, TaskFilesTransferUtils.buildResourcePath(taskExecutionContext, fileName));
+
+ }
+
+ @Test
+ void testPackIfDirectory(@TempDir Path tempDir) throws Exception {
+ File folderPath = tempDir.toFile();
+ File file1 = new File(folderPath.getPath() + "/test.txt");
+ File file2 = new File(folderPath.getPath() + "/test.zip");
+ boolean isSuccess1 = file1.createNewFile();
+ boolean isSuccess2 = file2.createNewFile();
+
+ Assertions.assertTrue(isSuccess1);
+ Assertions.assertTrue(isSuccess2);
+
+ Assertions.assertEquals(file1.getPath(), TaskFilesTransferUtils.packIfDirectory(file1.getPath()));
+ Assertions.assertEquals(file2.getPath(), TaskFilesTransferUtils.packIfDirectory(file2.getPath()));
+
+ String expectFolderPackPath = folderPath.getPath() + Constants.PACK_SUFFIX;
+ Assertions.assertEquals(expectFolderPackPath, TaskFilesTransferUtils.packIfDirectory(folderPath.getPath()));
+ }
+}
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 2607bbaf6f97..0ed3b444f0d7 100644
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -425,4 +425,4 @@ sigmund-1.0.1.jar
wrappy-1.0.2.jar
yallist-2.1.2.jar
maven-wrapper.jar
-
+zt-zip-1.17.jar