Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import org.apache.dolphinscheduler.api.audit.operator.BaseAuditOperator;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.repository.QueueDao;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
Expand All @@ -28,7 +28,7 @@
public class YarnQueueAuditOperatorImpl extends BaseAuditOperator {

@Autowired
private QueueMapper queueMapper;
private QueueDao queueDao;

@Override
public String getObjectNameFromIdentity(Object identity) {
Expand All @@ -37,7 +37,7 @@ public String getObjectNameFromIdentity(Object identity) {
return "";
}

Queue obj = queueMapper.selectById(objId);
Queue obj = queueDao.queryById(objId);
return obj == null ? "" : obj.getQueueName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,11 @@
import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
import org.apache.dolphinscheduler.dao.mapper.AlertPluginInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskGroupMapper;
import org.apache.dolphinscheduler.dao.repository.DataSourceDao;
import org.apache.dolphinscheduler.dao.repository.K8sNamespaceDao;
import org.apache.dolphinscheduler.dao.repository.ProjectDao;
import org.apache.dolphinscheduler.dao.repository.QueueDao;
import org.apache.dolphinscheduler.dao.repository.TenantDao;
import org.apache.dolphinscheduler.dao.repository.UserDao;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;
Expand Down Expand Up @@ -126,10 +126,10 @@ public Set<Object> userOwnedResourceIdsAcquisition(Object authorizationType, Int
@Component
public static class QueueResourcePermissionCheck implements ResourceAcquisitionAndPermissionCheck<Integer> {

private final QueueMapper queueMapper;
private final QueueDao queueDao;

public QueueResourcePermissionCheck(QueueMapper queueMapper) {
this.queueMapper = queueMapper;
public QueueResourcePermissionCheck(QueueDao queueDao) {
this.queueDao = queueDao;
}

@Override
Expand All @@ -147,7 +147,7 @@ public Set<Integer> listAuthorizedResourceIds(int userId, Logger logger) {
if (userId != 0) {
return Collections.emptySet();
}
List<Queue> queues = queueMapper.selectList(null);
List<Queue> queues = queueDao.queryAll();
return queues.stream().map(Queue::getId).collect(toSet());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.repository.QueueDao;
import org.apache.dolphinscheduler.dao.repository.TenantDao;
import org.apache.dolphinscheduler.dao.repository.UserDao;

Expand All @@ -57,7 +57,7 @@
public class QueueServiceImpl extends BaseServiceImpl implements QueueService {

@Autowired
private QueueMapper queueMapper;
private QueueDao queueDao;

@Autowired
private UserDao userDao;
Expand Down Expand Up @@ -124,7 +124,7 @@ public List<Queue> queryList(User loginUser) {
ids = ids.isEmpty() ? new HashSet<>() : ids;
ids.add(Constants.DEFAULT_QUEUE_ID);
}
return queueMapper.selectBatchIds(ids);
return queueDao.queryByIds(ids);
}

/**
Expand All @@ -145,7 +145,7 @@ public PageInfo<Queue> queryList(User loginUser, String searchVal, Integer pageN
return pageInfo;
}
Page<Queue> page = new Page<>(pageNo, pageSize);
IPage<Queue> queueList = queueMapper.queryQueuePaging(page, new ArrayList<>(ids), searchVal);
IPage<Queue> queueList = queueDao.queryQueuePaging(page, new ArrayList<>(ids), searchVal);
Integer count = (int) queueList.getTotal();
pageInfo.setTotal(count);
pageInfo.setTotalList(queueList.getRecords());
Expand All @@ -168,7 +168,7 @@ public Queue createQueue(User loginUser, String queue, String queueName) {

Queue queueObj = new Queue(queueName, queue);
validQueue(queueObj);
queueMapper.insert(queueObj);
queueDao.insert(queueObj);

return queueObj;
}
Expand All @@ -190,7 +190,7 @@ public Queue updateQueue(User loginUser, int id, String queue, String queueName)

Queue updateQueue = new Queue(id, queueName, queue);
updateQueue.setCreateTime(null);
Queue existsQueue = queueMapper.selectById(id);
Queue existsQueue = queueDao.queryById(id);
updateQueueValid(existsQueue, updateQueue);

// check old queue using by any user
Expand All @@ -201,7 +201,7 @@ public Queue updateQueue(User loginUser, int id, String queue, String queueName)
log.info("Old queue have related {} users, exec update user success.", relatedUserNums);
}

queueMapper.updateById(updateQueue);
queueDao.updateById(updateQueue);
return updateQueue;
}

Expand All @@ -220,7 +220,7 @@ public void deleteQueueById(User loginUser, int id) throws Exception {
throw new ServiceException(Status.USER_NO_OPERATION_PERM);
}

Queue queue = queueMapper.selectById(id);
Queue queue = queueDao.queryById(id);
if (Objects.isNull(queue)) {
log.error("Queue does not exist");
throw new ServiceException(Status.QUEUE_NOT_EXIST);
Expand All @@ -238,8 +238,7 @@ public void deleteQueueById(User loginUser, int id) throws Exception {
throw new ServiceException(Status.DELETE_QUEUE_BY_ID_FAIL_USERS, userList.size());
}

int delete = queueMapper.deleteById(id);
if (delete <= 0) {
if (!queueDao.deleteById(id)) {
throw new ServiceException(Status.DELETE_QUEUE_BY_ID_ERROR);
}

Expand Down Expand Up @@ -267,7 +266,7 @@ public void verifyQueue(String queue, String queueName) {
* @return true if the queue not exists, otherwise return false
*/
private boolean checkQueueExist(String queue) {
return queueMapper.existQueue(queue, null) == Boolean.TRUE;
return queueDao.existQueue(queue, null);
}

/**
Expand All @@ -278,7 +277,7 @@ private boolean checkQueueExist(String queue) {
* @return true if the queue name not exists, otherwise return false
*/
private boolean checkQueueNameExist(String queueName) {
return queueMapper.existQueue(null, queueName) == Boolean.TRUE;
return queueDao.existQueue(null, queueName);
}

/**
Expand All @@ -304,14 +303,14 @@ private boolean checkIfQueueIsInUsing(String oldQueue, String newQueue) {
*/
@Override
public Queue createQueueIfNotExists(String queue, String queueName) {
Queue existsQueue = queueMapper.queryQueueName(queue, queueName);
Queue existsQueue = queueDao.queryQueueName(queue, queueName);
if (!Objects.isNull(existsQueue)) {
log.info("Queue exists, so return it, queueName:{}.", queueName);
return existsQueue;
}
Queue queueObj = new Queue(queueName, queue);
validQueue(queueObj);
queueMapper.insert(queueObj);
queueDao.insert(queueObj);
log.info("Queue create complete, queueName:{}.", queueObj.getQueueName());
return queueObj;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.repository.QueueDao;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -45,7 +45,7 @@ public class QueueResourcePermissionCheckTest {
private ResourcePermissionCheckServiceImpl.QueueResourcePermissionCheck queueResourcePermissionCheck;

@Mock
private QueueMapper queueMapper;
private QueueDao queueDao;

@Test
public void testPermissionCheck() {
Expand All @@ -62,7 +62,7 @@ public void testAuthorizationTypes() {
@Test
public void testListAuthorizedResourceIds() {
Queue queue = new Queue();
Mockito.when(queueMapper.selectList(null)).thenReturn(Arrays.asList(queue));
Mockito.when(queueDao.queryAll()).thenReturn(Arrays.asList(queue));
// GENERAL_USER
User user = getLoginUser();
Assertions.assertEquals(0, queueResourcePermissionCheck.listAuthorizedResourceIds(user.getId(), logger).size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.repository.QueueDao;
import org.apache.dolphinscheduler.dao.repository.UserDao;

import org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -73,7 +73,7 @@ public class QueueServiceTest {
private QueueServiceImpl queueService;

@Mock
private QueueMapper queueMapper;
private QueueDao queueDao;

@Mock
private UserDao userDao;
Expand Down Expand Up @@ -101,7 +101,7 @@ public void testQueryList() {
ids.add(1);
when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.QUEUE,
getLoginUser().getId(), queueServiceImplLogger)).thenReturn(ids);
when(queueMapper.selectBatchIds(Mockito.anySet())).thenReturn(getQueueList());
when(queueDao.queryByIds(Mockito.anySet())).thenReturn(getQueueList());
assertDoesNotThrow(() -> queueService.queryList(getLoginUser()));

}
Expand All @@ -116,7 +116,7 @@ public void testQueryListPage() {
ids.add(1);
when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.QUEUE,
getLoginUser().getId(), queueServiceImplLogger)).thenReturn(ids);
when(queueMapper.queryQueuePaging(Mockito.any(Page.class), Mockito.anyList(), Mockito.eq(QUEUE_NAME)))
when(queueDao.queryQueuePaging(Mockito.any(Page.class), Mockito.anyList(), Mockito.eq(QUEUE_NAME)))
.thenReturn(page);
PageInfo<Queue> queuePageInfo = queueService.queryList(getLoginUser(), QUEUE_NAME, 1, 10);
Assertions.assertTrue(CollectionUtils.isNotEmpty(queuePageInfo.getTotalList()));
Expand Down Expand Up @@ -147,9 +147,9 @@ public void testCreateQueue() {

@Test
public void testUpdateQueue() {
when(queueMapper.selectById(1)).thenReturn(getQUEUE());
when(queueMapper.existQueue(EXISTS, null)).thenReturn(true);
when(queueMapper.existQueue(null, EXISTS)).thenReturn(true);
when(queueDao.queryById(1)).thenReturn(getQUEUE());
when(queueDao.existQueue(EXISTS, null)).thenReturn(true);
when(queueDao.existQueue(null, EXISTS)).thenReturn(true);
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.QUEUE,
getLoginUser().getId(), YARN_QUEUE_UPDATE, baseServiceLogger)).thenReturn(true);
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.QUEUE, new Object[]{0}, 0,
Expand Down Expand Up @@ -187,11 +187,11 @@ public void testUpdateQueue() {
Assertions.assertNull(queue.getCreateTime());

// success update with same queue name
when(queueMapper.existQueue(NOT_EXISTS_FINAL, null)).thenReturn(false);
when(queueDao.existQueue(NOT_EXISTS_FINAL, null)).thenReturn(false);
assertDoesNotThrow(() -> queueService.updateQueue(getLoginUser(), 1, NOT_EXISTS_FINAL, NOT_EXISTS));

// success update with same queue value
when(queueMapper.existQueue(null, NOT_EXISTS_FINAL)).thenReturn(false);
when(queueDao.existQueue(null, NOT_EXISTS_FINAL)).thenReturn(false);
assertDoesNotThrow(() -> queueService.updateQueue(getLoginUser(), 1, NOT_EXISTS, NOT_EXISTS_FINAL));
}

Expand All @@ -209,13 +209,13 @@ public void testVerifyQueue() {
Assertions.assertEquals(formatter, exception.getMessage());

// exist queueName
when(queueMapper.existQueue(EXISTS, null)).thenReturn(true);
when(queueDao.existQueue(EXISTS, null)).thenReturn(true);
exception = Assertions.assertThrows(ServiceException.class, () -> queueService.verifyQueue(EXISTS, QUEUE_NAME));
formatter = MessageFormat.format(Status.QUEUE_VALUE_EXIST.getMsg(), EXISTS);
Assertions.assertEquals(formatter, exception.getMessage());

// exist queue
when(queueMapper.existQueue(null, EXISTS)).thenReturn(true);
when(queueDao.existQueue(null, EXISTS)).thenReturn(true);
exception = Assertions.assertThrows(ServiceException.class, () -> queueService.verifyQueue(QUEUE, EXISTS));
formatter = MessageFormat.format(Status.QUEUE_NAME_EXIST.getMsg(), EXISTS);
Assertions.assertEquals(formatter, exception.getMessage());
Expand All @@ -229,12 +229,12 @@ public void testCreateQueueIfNotExists() {
Queue queue;

// queue exists
when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(getQUEUE());
when(queueDao.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(getQUEUE());
queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME);
Assertions.assertEquals(getQUEUE(), queue);

// queue not exists
when(queueMapper.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(null);
when(queueDao.queryQueueName(QUEUE, QUEUE_NAME)).thenReturn(null);
queue = queueService.createQueueIfNotExists(QUEUE, QUEUE_NAME);
Assertions.assertEquals(new Queue(QUEUE_NAME, QUEUE), queue);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.repository;

import org.apache.dolphinscheduler.dao.entity.Queue;

import java.util.List;

import com.baomidou.mybatisplus.core.metadata.IPage;

public interface QueueDao extends IDao<Queue> {

IPage<Queue> queryQueuePaging(IPage<Queue> page, List<Integer> ids, String searchVal);

boolean existQueue(String queue, String queueName);

Queue queryQueueName(String queue, String queueName);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.dao.repository.impl;

import org.apache.dolphinscheduler.dao.entity.Queue;
import org.apache.dolphinscheduler.dao.mapper.QueueMapper;
import org.apache.dolphinscheduler.dao.repository.BaseDao;
import org.apache.dolphinscheduler.dao.repository.QueueDao;

import java.util.List;

import lombok.NonNull;

import org.springframework.stereotype.Repository;

import com.baomidou.mybatisplus.core.metadata.IPage;

@Repository
public class QueueDaoImpl extends BaseDao<Queue, QueueMapper> implements QueueDao {

public QueueDaoImpl(@NonNull QueueMapper queueMapper) {
super(queueMapper);
}

@Override
public IPage<Queue> queryQueuePaging(IPage<Queue> page, List<Integer> ids, String searchVal) {
return mybatisMapper.queryQueuePaging(page, ids, searchVal);
}

@Override
public boolean existQueue(String queue, String queueName) {
return Boolean.TRUE.equals(mybatisMapper.existQueue(queue, queueName));
}

@Override
public Queue queryQueueName(String queue, String queueName) {
return mybatisMapper.queryQueueName(queue, queueName);
}
}
Loading