Skip to content

Commit

Permalink
BE: RBAC: Fix viewable topics filter (#3946)
Browse files Browse the repository at this point in the history
  • Loading branch information
Haarolean authored Jun 20, 2023
1 parent 2ac8646 commit 6fe6165
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,13 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,
ServerWebExchange exchange) {

return topicsService.getTopicsForPagination(getCluster(clusterName))
.flatMap(existingTopics -> {
.flatMap(topics -> accessControlService.filterViewableTopics(topics, clusterName))
.flatMap(topics -> {
int pageSize = perPage != null && perPage > 0 ? perPage : DEFAULT_PAGE_SIZE;
var topicsToSkip = ((page != null && page > 0 ? page : 1) - 1) * pageSize;
var comparator = sortOrder == null || !sortOrder.equals(SortOrderDTO.DESC)
? getComparatorForTopic(orderBy) : getComparatorForTopic(orderBy).reversed();
List<InternalTopic> filtered = existingTopics.stream()
List<InternalTopic> filtered = topics.stream()
.filter(topic -> !topic.isInternal()
|| showInternal != null && showInternal)
.filter(topic -> search == null || StringUtils.containsIgnoreCase(topic.getName(), search))
Expand All @@ -189,7 +190,6 @@ public Mono<ResponseEntity<TopicsResponseDTO>> getTopics(String clusterName,

return topicsService.loadTopics(getCluster(clusterName), topicsPage)
.flatMapMany(Flux::fromIterable)
.filterWhen(dto -> accessControlService.isTopicAccessible(dto, clusterName))
.collectList()
.map(topicsToRender ->
new TopicsResponseDTO()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,19 +202,23 @@ public boolean isTopicAccessible(AccessContext context, AuthenticatedUser user)
return isAccessible(Resource.TOPIC, context.getTopic(), user, context, requiredActions);
}

public Mono<Boolean> isTopicAccessible(InternalTopic dto, String clusterName) {
public Mono<List<InternalTopic>> filterViewableTopics(List<InternalTopic> topics, String clusterName) {
if (!rbacEnabled) {
return Mono.just(true);
return Mono.just(topics);
}

AccessContext accessContext = AccessContext
.builder()
.cluster(clusterName)
.topic(dto.getName())
.topicActions(TopicAction.VIEW)
.build();

return getUser().map(u -> isTopicAccessible(accessContext, u));
return getUser()
.map(user -> topics.stream()
.filter(topic -> {
var accessContext = AccessContext
.builder()
.cluster(clusterName)
.topic(topic.getName())
.topicActions(TopicAction.VIEW)
.build();
return isTopicAccessible(accessContext, user);
}
).toList());
}

private boolean isConsumerGroupAccessible(AccessContext context, AuthenticatedUser user) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static org.mockito.Mockito.when;

import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Collections;
import org.mockito.Mockito;
import reactor.core.publisher.Mono;

Expand All @@ -16,7 +17,7 @@ public AccessControlService getMock() {
when(mock.validateAccess(any())).thenReturn(Mono.empty());
when(mock.isSchemaAccessible(anyString(), anyString())).thenReturn(Mono.just(true));

when(mock.isTopicAccessible(any(), anyString())).thenReturn(Mono.just(true));
when(mock.filterViewableTopics(any(), any())).then(invocation -> Mono.just(invocation.getArgument(0)));

return mock;
}
Expand Down

0 comments on commit 6fe6165

Please sign in to comment.