Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,8 @@ public Result<Map<Long, List<TaskDefinition>>> getNodeListMapByDefinitionCodes(@
@ApiException(GET_TASKS_LIST_BY_WORKFLOW_DEFINITION_CODE_ERROR)
public Result<List<DependentSimplifyDefinition>> getWorkflowListByProjectCode(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode) {
return Result.success(workflowDefinitionService.queryWorkflowDefinitionListByProjectCode(projectCode));
return Result.success(
workflowDefinitionService.queryWorkflowDefinitionListByProjectCode(loginUser, projectCode));
}

/**
Expand All @@ -590,7 +591,7 @@ public Result<List<DependentSimplifyDefinition>> getTaskListByWorkflowDefinition
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@RequestParam(value = "workflowDefinitionCode") Long workflowDefinitionCode) {
return Result.success(workflowDefinitionService
.queryTaskDefinitionListByWorkflowDefinitionCode(projectCode, workflowDefinitionCode));
.queryTaskDefinitionListByWorkflowDefinitionCode(loginUser, projectCode, workflowDefinitionCode));
}

@Operation(summary = "deleteByWorkflowDefinitionCode", description = "DELETE_WORKFLOW_DEFINITION_BY_ID_NOTES")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public Result<List<WorkFlowRelationDetail>> queryWorkFlowLineageByName(@Paramete
@RequestParam(value = "workflowDefinitionName", required = false) String workflowDefinitionName) {
workflowDefinitionName = ParameterUtils.handleEscapes(workflowDefinitionName);
List<WorkFlowRelationDetail> workFlowLineages =
workflowLineageService.queryWorkFlowLineageByName(projectCode, workflowDefinitionName);
workflowLineageService.queryWorkFlowLineageByName(loginUser, projectCode, workflowDefinitionName);
return Result.success(workFlowLineages);
}

Expand All @@ -86,7 +86,8 @@ public Result<List<WorkFlowRelationDetail>> queryWorkFlowLineageByName(@Paramete
public Result<Map<String, Object>> queryWorkFlowLineageByCode(@Parameter(hidden = true) @RequestAttribute(value = SESSION_USER) User loginUser,
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable(value = "workFlowCode") long workFlowCode) {
WorkFlowLineage workFlowLineage = workflowLineageService.queryWorkFlowLineageByCode(projectCode, workFlowCode);
WorkFlowLineage workFlowLineage =
workflowLineageService.queryWorkFlowLineageByCode(loginUser, projectCode, workFlowCode);
Map<String, Object> result = new HashMap<>();
result.put(Constants.DATA_LIST, workFlowLineage);
return Result.success(result);
Expand All @@ -100,7 +101,7 @@ public Result<Map<String, Object>> queryWorkFlowLineage(@Parameter(hidden = true
@Parameter(name = "projectCode", description = "PROJECT_CODE", required = true) @PathVariable long projectCode) {
try {
Map<String, Object> result = new HashMap<>();
WorkFlowLineage workFlowLineage = workflowLineageService.queryWorkFlowLineage(projectCode);
WorkFlowLineage workFlowLineage = workflowLineageService.queryWorkFlowLineage(loginUser, projectCode);
result.put(Constants.DATA_LIST, workFlowLineage);
return Result.success(result);
} catch (Exception e) {
Expand Down Expand Up @@ -133,7 +134,7 @@ public Result<Map<String, Object>> verifyTaskCanDelete(@Parameter(hidden = true)
@RequestParam(value = "taskCode") long taskCode) {
Result<Map<String, Object>> result = new Result<>();
Optional<String> taskDepMsg =
workflowLineageService.taskDependentMsg(projectCode, workflowDefinitionCode, taskCode);
workflowLineageService.taskDependentMsg(loginUser, projectCode, workflowDefinitionCode, taskCode);
if (taskDepMsg.isPresent()) {
throw new ServiceException(taskDepMsg.get());
}
Expand All @@ -158,7 +159,8 @@ public Result<Map<String, Object>> queryDependentTasks(@Parameter(hidden = true)
@RequestParam(value = "taskCode", required = false) Long taskCode) {
Map<String, Object> result = new HashMap<>();
List<DependentLineageTask> dependentLineageTaskList =
workflowLineageService.queryDependentWorkflowDefinitions(projectCode, workFlowCode, taskCode);
workflowLineageService.queryDependentWorkflowDefinitions(loginUser, projectCode, workFlowCode,
taskCode);
result.put(Constants.DATA_LIST, dependentLineageTaskList);
return Result.success(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,12 +171,12 @@ Map<Long, List<TaskDefinition>> getNodeListMapByDefinitionCodes(User loginUser,
/**
* query workflow definition list by project code (simplified records used for dependent task picker)
*/
List<DependentSimplifyDefinition> queryWorkflowDefinitionListByProjectCode(long projectCode);
List<DependentSimplifyDefinition> queryWorkflowDefinitionListByProjectCode(User loginUser, long projectCode);

/**
* query task definition list (simplified records) by workflow definition code
*/
List<DependentSimplifyDefinition> queryTaskDefinitionListByWorkflowDefinitionCode(long projectCode,
List<DependentSimplifyDefinition> queryTaskDefinitionListByWorkflowDefinitionCode(User loginUser, long projectCode,
Long workflowDefinitionCode);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.dolphinscheduler.dao.entity.DependentLineageTask;
import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
import org.apache.dolphinscheduler.dao.entity.WorkflowDefinition;
Expand All @@ -29,11 +30,12 @@

public interface WorkflowLineageService {

List<WorkFlowRelationDetail> queryWorkFlowLineageByName(long projectCode, String workflowDefinitionName);
List<WorkFlowRelationDetail> queryWorkFlowLineageByName(User loginUser, long projectCode,
String workflowDefinitionName);

WorkFlowLineage queryWorkFlowLineageByCode(long projectCode, long workflowDefinitionCode);
WorkFlowLineage queryWorkFlowLineageByCode(User loginUser, long projectCode, long workflowDefinitionCode);

WorkFlowLineage queryWorkFlowLineage(long projectCode);
WorkFlowLineage queryWorkFlowLineage(User loginUser, long projectCode);

/**
* Query downstream tasks depend on a workflow definition or a task
Expand Down Expand Up @@ -70,9 +72,10 @@ List<WorkflowDefinition> resolveDownstreamWorkflowDefinitionCodes(long rootWorkf
* @param taskCode Task code want to query tasks dependence
* @return dependent workflow definition
*/
Optional<String> taskDependentMsg(long projectCode, long workflowDefinitionCode, long taskCode);
Optional<String> taskDependentMsg(User loginUser, long projectCode, long workflowDefinitionCode, long taskCode);

List<DependentLineageTask> queryDependentWorkflowDefinitions(long projectCode, long workflowDefinitionCode,
List<DependentLineageTask> queryDependentWorkflowDefinitions(User loginUser, long projectCode,
long workflowDefinitionCode,
Long taskCode);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ public WorkflowDefinition updateWorkflowDefinition(User loginUser,
* @param workflowDefinition WorkflowDefinition you change task definition and task relation
* @param taskRelationList All the latest task relation list from workflow definition
*/
private void taskUsedInOtherTaskValid(WorkflowDefinition workflowDefinition,
private void taskUsedInOtherTaskValid(User loginUser, WorkflowDefinition workflowDefinition,
List<WorkflowTaskRelationLog> taskRelationList) {
List<WorkflowTaskRelation> oldWorkflowTaskRelationList =
workflowTaskRelationDao.queryByWorkflowDefinitionCode(workflowDefinition.getCode());
Expand All @@ -677,7 +677,8 @@ private void taskUsedInOtherTaskValid(WorkflowDefinition workflowDefinition,
.anyMatch(relation -> oldWorkflowTaskRelation.getPostTaskCode() == relation.getPostTaskCode());
if (!oldTaskExists) {
Optional<String> taskDepMsg = workflowLineageService.taskDependentMsg(
workflowDefinition.getProjectCode(), oldWorkflowTaskRelation.getWorkflowDefinitionCode(),
loginUser, workflowDefinition.getProjectCode(),
oldWorkflowTaskRelation.getWorkflowDefinitionCode(),
oldWorkflowTaskRelation.getPostTaskCode());
taskDepMsg.ifPresent(sb::append);
}
Expand Down Expand Up @@ -745,7 +746,7 @@ protected WorkflowDefinition updateDagDefine(User loginUser,
workflowDefinition.getCode(), insertVersion);
workflowDefinition.setVersion(insertVersion);

taskUsedInOtherTaskValid(workflowDefinition, taskRelationList);
taskUsedInOtherTaskValid(loginUser, workflowDefinition, taskRelationList);
int insertResult = processService.saveTaskRelation(loginUser, workflowDefinition.getProjectCode(),
workflowDefinition.getCode(), insertVersion, taskRelationList, taskDefinitionLogs, Boolean.TRUE);
if (insertResult != Constants.EXIT_CODE_SUCCESS) {
Expand Down Expand Up @@ -833,7 +834,7 @@ public void batchDeleteWorkflowDefinitionByCodes(User loginUser, long projectCod
*
* @param workflowDefinition WorkflowDefinition you change task definition and task relation
*/
private void workflowDefinitionUsedInOtherTaskValid(WorkflowDefinition workflowDefinition) {
private void workflowDefinitionUsedInOtherTaskValid(User loginUser, WorkflowDefinition workflowDefinition) {
// check workflow definition is already online
if (workflowDefinition.getReleaseState() == ReleaseState.ONLINE) {
throw new ServiceException(Status.WORKFLOW_DEFINE_STATE_ONLINE, workflowDefinition.getName());
Expand All @@ -847,8 +848,8 @@ private void workflowDefinitionUsedInOtherTaskValid(WorkflowDefinition workflowD
}

// check workflow used by other task, including sub workflow and dependent task type
Optional<String> taskDepMsg = workflowLineageService.taskDependentMsg(workflowDefinition.getProjectCode(),
workflowDefinition.getCode(), 0);
Optional<String> taskDepMsg = workflowLineageService.taskDependentMsg(loginUser,
workflowDefinition.getProjectCode(), workflowDefinition.getCode(), 0);

if (taskDepMsg.isPresent()) {
String errorMeg = "workflow definition cannot be deleted because it has dependent, " + taskDepMsg.get();
Expand All @@ -870,7 +871,7 @@ public void deleteWorkflowDefinitionByCode(User loginUser, long code) {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}

workflowDefinitionUsedInOtherTaskValid(workflowDefinition);
workflowDefinitionUsedInOtherTaskValid(loginUser, workflowDefinition);

// get the timing according to the workflow definition
Schedule scheduleObj = scheduleDao.queryByWorkflowDefinitionCode(code);
Expand Down Expand Up @@ -1050,7 +1051,10 @@ public List<DagData> queryAllWorkflowDefinitionByProjectCode(User loginUser, lon
* @return workflow definition list in the project
*/
@Override
public List<DependentSimplifyDefinition> queryWorkflowDefinitionListByProjectCode(long projectCode) {
public List<DependentSimplifyDefinition> queryWorkflowDefinitionListByProjectCode(User loginUser,
long projectCode) {
Project project = projectDao.queryByCode(projectCode);
projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
return workflowDefinitionDao.queryDefinitionListByProjectCodeAndWorkflowDefinitionCodes(projectCode, null);
}

Expand All @@ -1062,8 +1066,11 @@ public List<DependentSimplifyDefinition> queryWorkflowDefinitionListByProjectCod
* @return task definition list in the workflow definition
*/
@Override
public List<DependentSimplifyDefinition> queryTaskDefinitionListByWorkflowDefinitionCode(long projectCode,
public List<DependentSimplifyDefinition> queryTaskDefinitionListByWorkflowDefinitionCode(User loginUser,
long projectCode,
Long workflowDefinitionCode) {
Project project = projectDao.queryByCode(projectCode);
projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
Set<Long> definitionCodesSet = new HashSet<>();
definitionCodesSet.add(workflowDefinitionCode);
List<DependentSimplifyDefinition> workflowDefinitions = workflowDefinitionDao
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@

package org.apache.dolphinscheduler.api.service.impl;

import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.WORKFLOW_DEFINITION;

import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.service.WorkflowLineageService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.dao.entity.DependentLineageTask;
import org.apache.dolphinscheduler.dao.entity.DependentWorkflowDefinition;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkFlowLineage;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelation;
import org.apache.dolphinscheduler.dao.entity.WorkFlowRelationDetail;
Expand Down Expand Up @@ -75,21 +79,28 @@
@Autowired
private WorkflowDefinitionDao workflowDefinitionDao;

@Autowired

Check warning on line 82 in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowLineageServiceImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this field injection and use constructor injection instead.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZ5OlKH6CmNIQl9ZWsE0&open=AZ5OlKH6CmNIQl9ZWsE0&pullRequest=18284
private ProjectService projectService;

@Override
public List<WorkFlowRelationDetail> queryWorkFlowLineageByName(long projectCode, String workflowDefinitionName) {
public List<WorkFlowRelationDetail> queryWorkFlowLineageByName(User loginUser, long projectCode,
String workflowDefinitionName) {
Project project = projectDao.queryByCode(projectCode);
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
}
projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
return workflowTaskLineageDao.queryWorkFlowLineageByName(projectCode, workflowDefinitionName);
}

@Override
public WorkFlowLineage queryWorkFlowLineageByCode(long projectCode, long workflowDefinitionCode) {
public WorkFlowLineage queryWorkFlowLineageByCode(User loginUser, long projectCode,
long workflowDefinitionCode) {
Project project = projectDao.queryByCode(projectCode);
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
}
projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
List<WorkflowTaskLineage> upstreamWorkflowTaskLineageList =
workflowTaskLineageDao.queryByWorkflowDefinitionCode(workflowDefinitionCode);
List<WorkflowTaskLineage> downstreamWorkflowTaskLineageList =
Expand Down Expand Up @@ -117,11 +128,12 @@
}

@Override
public WorkFlowLineage queryWorkFlowLineage(long projectCode) {
public WorkFlowLineage queryWorkFlowLineage(User loginUser, long projectCode) {
Project project = projectDao.queryByCode(projectCode);
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
}
projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
List<WorkflowTaskLineage> workflowTaskLineageList = workflowTaskLineageDao.queryByProjectCode(projectCode);
List<WorkFlowRelation> workFlowRelationList = getWorkFlowRelations(workflowTaskLineageList);
List<WorkFlowRelationDetail> workFlowRelationDetailList =
Expand Down Expand Up @@ -174,7 +186,13 @@
* @return Optional of formatter message
*/
@Override
public Optional<String> taskDependentMsg(long projectCode, long workflowDefinitionCode, long taskCode) {
public Optional<String> taskDependentMsg(User loginUser, long projectCode, long workflowDefinitionCode,
long taskCode) {
Project project = projectDao.queryByCode(projectCode);
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
}
projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
long queryTaskCode = 0;
if (taskCode != 0) {
queryTaskCode = taskCode;
Expand Down Expand Up @@ -285,12 +303,14 @@
}

@Override
public List<DependentLineageTask> queryDependentWorkflowDefinitions(long projectCode, long workflowDefinitionCode,
public List<DependentLineageTask> queryDependentWorkflowDefinitions(User loginUser, long projectCode,
long workflowDefinitionCode,
Long taskCode) {
Project project = projectDao.queryByCode(projectCode);
if (project == null) {
throw new ServiceException(Status.PROJECT_NOT_FOUND, projectCode);
}
projectService.checkProjectAndAuthThrowException(loginUser, project, WORKFLOW_DEFINITION);
List<DependentLineageTask> dependentLineageTaskList = new ArrayList<>();
List<WorkflowTaskLineage> workflowTaskLineageList =
workflowTaskLineageDao.queryWorkFlowLineageByDept(projectCode,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private void putMsg(Map<String, Object> result, Status status, Object... statusP
public void testQueryWorkFlowLineageByName() {
long projectCode = 1L;
String searchVal = "test";
Mockito.when(workFlowLineageService.queryWorkFlowLineageByName(projectCode, searchVal))
Mockito.when(workFlowLineageService.queryWorkFlowLineageByName(user, projectCode, searchVal))
.thenReturn(Collections.emptyList());
assertDoesNotThrow(() -> workflowLineageController.queryWorkFlowLineageByName(user, projectCode, searchVal));
}
Expand Down
Loading
Loading