Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions dolphinscheduler-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
<error.prone.annotations.version>2.11.0</error.prone.annotations.version>
<emr-serverless-spark.version>2.4.1</emr-serverless-spark.version>
<credentials-java.version>1.0.1</credentials-java.version>
<zt-zip.version>1.17</zt-zip.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -973,6 +974,12 @@
<version>${credentials-java.version}</version>
</dependency>

<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-zip</artifactId>
<version>${zt-zip.version}</version>
</dependency>

<!-- test dependencies on TestContainers -->
<dependency>
<groupId>org.testcontainers</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,4 +391,11 @@ public final class Constants {
public static final String RELEASE_STATE = "releaseState";
public static final String EXECUTE_TYPE = "executeType";

/**
* File parameter transfer
*/
public static final String CRC_SUFFIX = ".crc";
public static final String DOWNLOAD_TMP = ".DT_TMP";
public static final String PACK_SUFFIX = "_ds_pack.zip";
public static final String RESOURCE_TAG = "DATA_TRANSFER";
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.time.format.DateTimeFormatter;
import java.util.Calendar;
import java.util.Date;
import java.util.Objects;
import java.util.TimeZone;

import javax.annotation.Nonnull;
Expand Down Expand Up @@ -700,4 +701,14 @@ public static String getTimestampString() {
return String.valueOf(System.currentTimeMillis());
}

/**
* @param timeMillis timeMillis like System.currentTimeMillis()
* @param dateTimeFormatter expect formatter, like yyyy-MM-dd HH:mm:ss
* @return formatted string
*/
public static String formatTimeStamp(long timeMillis, DateTimeFormatter dateTimeFormatter) {
Objects.requireNonNull(dateTimeFormatter);
return dateTimeFormatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(timeMillis),
ZoneId.systemDefault()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,15 @@ private Map<String, Property> parseLocalParamsMap(AbstractParameters parameters)
Map<String, Property> localParametersMaps = new LinkedHashMap<>();
if (CollectionUtils.isNotEmpty(parameters.getLocalParams())) {
parameters.getLocalParams()
.forEach(localParam -> localParametersMaps.put(localParam.getProp(), localParam));
.forEach(localParam -> {
// If the parameter type is FILE and the direction is IN,
// then use the parameter value as map key.
if (DataType.FILE.equals(localParam.getType()) && Direct.IN.equals(localParam.getDirect())) {
localParametersMaps.put(localParam.getValue(), localParam);
} else {
localParametersMaps.put(localParam.getProp(), localParam);
}
});
}
return localParametersMaps;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,60 @@
String.valueOf(workflowDefinition.getCode()));
}

@Test
public void testParamParsingPreparation_withLocalFileParams() {

Check warning on line 236 in dolphinscheduler-service/src/test/java/org/apache/dolphinscheduler/service/expand/CuringParamsServiceImplTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ5EA9PsTfc-xpUC5gDz&open=AZ5EA9PsTfc-xpUC5gDz&pullRequest=18273
TaskInstance taskInstance = new TaskInstance();
taskInstance.setId(1);
taskInstance.setExecutePath("home/path/execute");

TaskDefinition taskDefinition = new TaskDefinition();
taskDefinition.setName("TaskName-1");
taskDefinition.setCode(1000001L);

WorkflowInstance workflowInstance = new WorkflowInstance();
workflowInstance.setId(2);
final BackfillWorkflowCommandParam backfillWorkflowCommandParam = BackfillWorkflowCommandParam.builder()
.timeZone("Asia/Shanghai")
.build();
workflowInstance.setHistoryCmd(CommandType.COMPLEMENT_DATA.toString());
workflowInstance.setCommandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam));
workflowInstance.setGlobalParams("");

WorkflowDefinition workflowDefinition = new WorkflowDefinition();
workflowDefinition.setName("ProcessName-1");
workflowDefinition.setProjectName("ProjectName");
workflowDefinition.setProjectCode(3000001L);
workflowDefinition.setCode(200001L);

Project project = new Project();
project.setName("ProjectName");
project.setCode(3000001L);

workflowInstance.setWorkflowDefinitionCode(workflowDefinition.getCode());
workflowInstance.setProjectCode(workflowDefinition.getProjectCode());

String varPoolParams = "[{\"prop\":\"task1.file\",\"direct\":\"OUT\",\"type\":\"FILE\",\"value\":\"file\"}]";
taskInstance.setTaskCode(taskDefinition.getCode());
taskInstance.setTaskDefinitionVersion(taskDefinition.getVersion());
taskInstance.setProjectCode(workflowDefinition.getProjectCode());
taskInstance.setWorkflowInstanceId(workflowInstance.getId());
taskInstance.setVarPool(varPoolParams);

Property fileInParam = new Property("file_new", Direct.IN, DataType.FILE, "task1.file");
AbstractParameters parameters = Mockito.mock(AbstractParameters.class);
Mockito.when(parameters.getLocalParams()).thenReturn(Lists.newArrayList(fileInParam));

Mockito.when(projectParameterMapper.queryByProjectCode(Mockito.anyLong())).thenReturn(Collections.emptyList());

Map<String, Property> propertyMap =
curingParamsServiceImpl.paramParsingPreparation(taskInstance, parameters, workflowInstance,
project.getName(), workflowDefinition.getName());
Assertions.assertNotNull(propertyMap);

Assertions.assertEquals("file", propertyMap.get("task1.file").getValue());
Assertions.assertEquals("file_new", propertyMap.get("task1.file").getProp());
}

@Test
public void testParseGlobalParamsMap() throws Exception {
WorkflowInstance workflowInstance = new WorkflowInstance();
Expand Down
4 changes: 4 additions & 0 deletions dolphinscheduler-worker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@
<artifactId>dolphinscheduler-actuator-authentication</artifactId>
</dependency>

<dependency>
<groupId>org.zeroturnaround</groupId>
<artifactId>zt-zip</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator;
import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskChannel;
import org.apache.dolphinscheduler.plugin.task.api.log.TaskLogMarkers;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.utils.TaskExecutionContextUtils;
import org.apache.dolphinscheduler.server.worker.utils.TaskFilesTransferUtils;
import org.apache.dolphinscheduler.server.worker.utils.TenantUtils;
import org.apache.dolphinscheduler.task.executor.AbstractTaskExecutor;
import org.apache.dolphinscheduler.task.executor.ITaskExecutor;
Expand All @@ -36,6 +39,7 @@
import org.apache.dolphinscheduler.task.executor.events.TaskExecutorRuntimeContextChangedLifecycleEvent;

import java.util.ArrayList;
import java.util.List;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -71,6 +75,22 @@ protected void initializeTaskPlugin() {
log.info("Set taskVarPool: {} successfully", taskExecutionContext.getVarPool());
}

@Override
public TaskExecutorState trackTaskExecutorState() {
TaskExecutorState state = super.trackTaskExecutorState();
if (TaskExecutorState.SUCCEEDED.equals(state)) {
uploadOutputFilesIfNeeded();
}
return state;
}

private void uploadOutputFilesIfNeeded() {
List<Property> uploadOutputFiles = TaskFilesTransferUtils.tryUploadOutputFiles(
taskExecutionContext,
storageOperator);
log.info("Upload output files successfully: {}", uploadOutputFiles);
}

@Override
protected void doTriggerTaskPlugin() {
final ITaskExecutor taskExecutor = this;
Expand Down Expand Up @@ -130,13 +150,20 @@ protected void initializeTaskContext() {
TaskExecutionContextUtils.createTaskInstanceWorkingDirectory(taskExecutionContext);
log.info("TaskInstance working directory: {} create successfully", taskExecutionContext.getExecutePath());

TaskChannel taskChannel = physicalTaskPluginFactory.getTaskChannel(this);
final ResourceContext resourceContext = TaskExecutionContextUtils.downloadResourcesIfNeeded(
physicalTaskPluginFactory.getTaskChannel(this),
taskChannel,
storageOperator,
taskExecutionContext);
taskExecutionContext.setResourceContext(resourceContext);
log.info("Download resources successfully: {}", taskExecutionContext.getResourceContext());

List<Property> downloadUpstreamFiles = TaskFilesTransferUtils.tryDownloadUpstreamFiles(
taskChannel,
taskExecutionContext,
storageOperator);
log.info("Download upstream files successfully: {}", downloadUpstreamFiles);

log.info(TaskLogMarkers.excludeInTaskLog(), "Initialized Task Context{}",
JSONUtils.toPrettyJsonString(taskExecutionContext));

Expand Down
Loading
Loading