diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index a587d53770a..818fab84d68 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -167,12 +167,13 @@ public Mono> getTopics(String clusterName, ServerWebExchange exchange) { return topicsService.getTopicsForPagination(getCluster(clusterName)) - .flatMap(existingTopics -> { + .flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName)) + .flatMap(topics -> { int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE; var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize; var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC) ? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed(); - List filtered = existingTopics.stream() + List filtered = topics.stream() .filter(topic -> !topic.isInternal() || showInternal != null && showInternal) .filter(topic -> search == null || StringUtils.containsIgnoreCase(topic.getName(), search)) @@ -189,7 +190,6 @@ public Mono> getTopics(String clusterName, return topicsService.loadTopics(getCluster(clusterName), topicsPage) .flatMapMany(Flux::fromIterable) - .filterWhen(dto -> accessControlService.isTopicAccessible(dto, clusterName)) .collectList() .map(topicsToRender -> new TopicsResponseDTO() diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java index b507d4c0810..debd9acd829 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/rbac/AccessControlService.java @@ -202,19 +202,23 @@ public boolean isTopicAccessible(AccessContext context, AuthenticatedUser user) return isAccessible(Resource.TOPIC, context.getTopic(), user, context, requiredActions); } - public Mono isTopicAccessible(InternalTopic dto, String clusterName) { + public Mono> filterViewableTopics(List topics, String clusterName) { if (!rbacEnabled) { - return Mono.just(true); + return Mono.just(topics); } - AccessContext accessContext = AccessContext - .builder() - .cluster(clusterName) - .topic(dto.getName()) - .topicActions(TopicAction.VIEW) - .build(); - - return getUser().map(u -> isTopicAccessible(accessContext, u)); + return getUser() + .map(user -> topics.stream() + .filter(topic -> { + var accessContext = AccessContext + .builder() + .cluster(clusterName) + .topic(topic.getName()) + .topicActions(TopicAction.VIEW) + .build(); + return isTopicAccessible(accessContext, user); + } + ).toList()); } private boolean isConsumerGroupAccessible(AccessContext context, AuthenticatedUser user) { diff --git a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/AccessControlServiceMock.java b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/AccessControlServiceMock.java index 263f1367d8c..852b75ae085 100644 --- a/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/AccessControlServiceMock.java +++ b/kafka-ui-api/src/test/java/com/provectus/kafka/ui/util/AccessControlServiceMock.java @@ -5,6 +5,7 @@ import static org.mockito.Mockito.when; import com.provectus.kafka.ui.service.rbac.AccessControlService; +import java.util.Collections; import org.mockito.Mockito; import reactor.core.publisher.Mono; @@ -16,7 +17,7 @@ public AccessControlService getMock() { when(mock.validateAccess(any())).thenReturn(Mono.empty()); when(mock.isSchemaAccessible(anyString(), anyString())).thenReturn(Mono.just(true)); - when(mock.isTopicAccessible(any(), anyString())).thenReturn(Mono.just(true)); + when(mock.filterViewableTopics(any(), any())).then(invocation -> Mono.just(invocation.getArgument(0))); return mock; }