Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
"timeout": 0,
"timeoutFlag": "CLOSE",
"timeoutNotifyStrategy": "",
"workerGroup": "default",
"workerGroup": "",
"cpuQuota": -1,
"memoryMax": -1,
"taskExecuteType": "BATCH"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.dto;

import lombok.Data;

@Data
public class ProjectPreferencesDTO {

private String taskPriority;

private String workerGroup;

private Long environmentCode;

private Integer failRetryTimes;

private Integer failRetryInterval;

private Integer cpuQuota;

private Integer memoryMax;

private Boolean timeoutFlag;

private String[] timeoutNotifyStrategy;

private Integer timeout;

private String warningType;

private String tenantCode;

private Integer warningGroupId;
}
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ public enum Status {
USED_WORKER_GROUP_EXISTS(1402004,
"You can not reassign worker groups to the project, cause these worker groups {0} are already used.",
"Worker组{0}被项目中任务或定时引用,无法重新分配"),
WORKER_GROUP_NOT_ASSIGNED_TO_PROJECT(1402005,
"Worker group [{0}] is not assigned to the project.",
"Worker组[{0}]未分配给项目"),
CREATE_WORKFLOW_LINEAGE_ERROR(1403001, "create workflow lineage error", "创建工作流血缘错误"),
UPDATE_WORKFLOW_LINEAGE_ERROR(1403002, "update workflow lineage error", "更新工作流血缘错误"),
DELETE_WORKFLOW_LINEAGE_ERROR(1403003, "delete workflow lineage error", "删除工作流血缘错误"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dolphinscheduler.dao.entity.User;

import java.util.List;
import java.util.Set;

public interface ProjectWorkerGroupRelationService {

Expand All @@ -43,4 +44,22 @@ public interface ProjectWorkerGroupRelationService {
*/
List<ProjectWorkerGroup> queryAssignedWorkerGroupsByProject(User loginUser, Long projectCode);

/**
* check if worker group is assigned to project
*
* @param projectCode project code
* @param workerGroup worker group name
* @return true if worker group is assigned to project
*/
boolean isWorkerGroupAssignedToProject(Long projectCode, String workerGroup);

/**
* get all assigned worker group names for a project
* This includes both directly assigned worker groups and worker groups used by tasks/schedules
*
* @param projectCode project code
* @return set of all assigned worker group names
*/
Set<String> getAllAssignedWorkerGroupNames(Long projectCode);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,24 @@

import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.PROJECT;

import org.apache.dolphinscheduler.api.dto.ProjectPreferencesDTO;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.ProjectPreferenceService;
import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.validator.WorkerGroupValidationContext;
import org.apache.dolphinscheduler.api.validator.WorkerGroupValidator;
import org.apache.dolphinscheduler.common.utils.CodeGenerateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
import org.apache.dolphinscheduler.dao.entity.ProjectPreference;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectPreferenceMapper;
import org.apache.dolphinscheduler.dao.repository.ProjectDao;

import org.apache.commons.lang3.StringUtils;

import java.util.Date;
import java.util.Objects;

Expand All @@ -55,6 +62,9 @@ public class ProjectPreferenceServiceImpl extends BaseServiceImpl
@Autowired
private ProjectDao projectDao;

@Autowired
private WorkerGroupValidator workerGroupValidator;

@Override
public Result updateProjectPreference(User loginUser, long projectCode, String preferences) {
Result result = new Result();
Expand All @@ -67,6 +77,27 @@ public Result updateProjectPreference(User loginUser, long projectCode, String p
.selectOne(new QueryWrapper<ProjectPreference>().lambda().eq(ProjectPreference::getProjectCode,
projectCode));

if (StringUtils.isNotEmpty(preferences)) {
try {
ProjectPreferencesDTO preferencesDTO = JSONUtils.parseObject(preferences, ProjectPreferencesDTO.class);
if (preferencesDTO != null && StringUtils.isNotEmpty(preferencesDTO.getWorkerGroup())) {
WorkerGroupValidationContext workerGroupContext = WorkerGroupValidationContext.builder()
.workerGroup(preferencesDTO.getWorkerGroup())
.projectCode(projectCode)
.build();
try {
workerGroupValidator.validate(workerGroupContext);
} catch (ServiceException e) {
putMsg(result, Status.WORKER_GROUP_NOT_ASSIGNED_TO_PROJECT, preferencesDTO.getWorkerGroup());
return result;
}
}
} catch (Exception e) {
log.warn("Failed to parse preferences JSON: {}", preferences, e);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw ServiceException here.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw ServiceException here.

ok

throw new ServiceException(Status.UPDATE_PROJECT_PREFERENCE_ERROR);
}
}

Date now = new Date();
if (Objects.isNull(projectPreference)) {
projectPreference = new ProjectPreference();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,22 @@ public List<ProjectWorkerGroup> queryAssignedWorkerGroupsByProject(User loginUse
}).distinct().collect(Collectors.toList());
}

@Override
public Set<String> getAllAssignedWorkerGroupNames(Long projectCode) {
Project project = projectDao.queryByCode(projectCode);
Set<String> assignedWorkerGroups = new TreeSet<>();

if (project != null) {
assignedWorkerGroups.addAll(getAllUsedWorkerGroups(project));
}

Set<String> directlyAssignedGroups =
projectWorkerGroupDao.queryAssignedWorkerGroupNamesByProjectCode(projectCode);
assignedWorkerGroups.addAll(directlyAssignedGroups);

return assignedWorkerGroups;
}

private Set<String> getAllUsedWorkerGroups(Project project) {
Set<String> usedWorkerGroups = new TreeSet<>();
// query all worker groups that tasks depend on
Expand All @@ -226,4 +242,12 @@ private Set<String> getAllUsedWorkerGroups(Project project) {
return usedWorkerGroups;
}

@Override
public boolean isWorkerGroupAssignedToProject(Long projectCode, String workerGroup) {
if (StringUtils.isEmpty(workerGroup)) {
return true;
}
return getAllAssignedWorkerGroupNames(projectCode).contains(workerGroup);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.validator.TenantExistValidator;
import org.apache.dolphinscheduler.api.validator.WorkerGroupValidationContext;
import org.apache.dolphinscheduler.api.validator.WorkerGroupValidator;
import org.apache.dolphinscheduler.api.vo.ScheduleVO;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
Expand Down Expand Up @@ -97,6 +99,9 @@ public class SchedulerServiceImpl extends BaseServiceImpl implements SchedulerSe
@Autowired
private TenantExistValidator tenantExistValidator;

@Autowired
private WorkerGroupValidator workerGroupValidator;

/**
* save schedule
*
Expand Down Expand Up @@ -182,6 +187,12 @@ public Schedule insertSchedule(User loginUser,
scheduleObj.setUserName(loginUser.getUserName());
scheduleObj.setReleaseState(ReleaseState.OFFLINE);
scheduleObj.setWorkflowInstancePriority(workflowInstancePriority);

WorkerGroupValidationContext workerGroupContext = WorkerGroupValidationContext.builder()
.workerGroup(workerGroup)
.projectCode(projectCode)
.build();
workerGroupValidator.validate(workerGroupContext);
scheduleObj.setWorkerGroup(workerGroup);
scheduleObj.setEnvironmentCode(environmentCode);
scheduleDao.insert(scheduleObj);
Expand Down Expand Up @@ -570,6 +581,11 @@ private Schedule updateSchedule(Schedule schedule, WorkflowDefinition workflowDe
schedule.setFailureStrategy(failureStrategy);
}

WorkerGroupValidationContext workerGroupContext = WorkerGroupValidationContext.builder()
.workerGroup(workerGroup)
.projectCode(workflowDefinition.getProjectCode())
.build();
workerGroupValidator.validate(workerGroupContext);
schedule.setWorkerGroup(workerGroup);
schedule.setEnvironmentCode(environmentCode);
schedule.setUpdateTime(now);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.dolphinscheduler.api.utils.PageInfo;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.api.validator.GlobalParamsValidator;
import org.apache.dolphinscheduler.api.validator.WorkerGroupValidator;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
import org.apache.dolphinscheduler.common.enums.UserType;
Expand Down Expand Up @@ -209,6 +210,9 @@ public class WorkflowDefinitionServiceImpl extends BaseServiceImpl implements Wo
@Autowired
private GlobalParamsValidator globalParamsValidator;

@Autowired
private WorkerGroupValidator workerGroupValidator;

/**
* create workflow definition
*
Expand Down Expand Up @@ -256,6 +260,10 @@ public WorkflowDefinition createWorkflowDefinition(User loginUser,
globalParamsValidator.validate(globalParams);

List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);

// Validate worker groups in task definitions
validateTaskWorkerGroups(projectCode, taskDefinitionLogs);

List<WorkflowTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);

long workflowDefinitionCode = CodeGenerateUtils.genCode();
Expand Down Expand Up @@ -381,6 +389,21 @@ private List<TaskDefinitionLog> generateTaskDefinitionList(String taskDefinition
}
}

/**
* Validate worker groups in task definitions
*/
private void validateTaskWorkerGroups(long projectCode, List<TaskDefinitionLog> taskDefinitionLogs) {
if (CollectionUtils.isEmpty(taskDefinitionLogs)) {
return;
}

List<String> workerGroups = taskDefinitionLogs.stream()
.map(TaskDefinitionLog::getWorkerGroup)
.collect(Collectors.toList());

workerGroupValidator.validate(workerGroups, projectCode);
}
Comment on lines +395 to +405
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of code should put into WorkerGroupValidator

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of code should put into WorkerGroupValidator

Okay, I will unify the logic.


private List<WorkflowTaskRelationLog> generateTaskRelationList(String taskRelationJson,
List<TaskDefinitionLog> taskDefinitionLogs) {
try {
Expand Down Expand Up @@ -626,6 +649,10 @@ public WorkflowDefinition updateWorkflowDefinition(User loginUser,
globalParamsValidator.validate(globalParams);

List<TaskDefinitionLog> taskDefinitionLogs = generateTaskDefinitionList(taskDefinitionJson);

// Validate worker groups in task definitions
validateTaskWorkerGroups(projectCode, taskDefinitionLogs);

List<WorkflowTaskRelationLog> taskRelationList = generateTaskRelationList(taskRelationJson, taskDefinitionLogs);

WorkflowDefinition workflowDefinition = workflowDefinitionDao.queryByCode(code).orElse(null);
Expand Down Expand Up @@ -1636,7 +1663,6 @@ public void switchWorkflowDefinitionVersion(User loginUser, long projectCode, lo
*
* @param srcProjectCode srcProjectCode
* @param targetProjectCode targetProjectCode
* @param result result
* @param failedWorkflowList failedWorkflowList
* @param isCopy isCopy
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.api.validator;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class WorkerGroupValidationContext {

private String workerGroup;

private long projectCode;

}
Loading
Loading