Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Audit backend #3831

Merged
merged 30 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion documentation/compose/jmx-exporter/kafka-broker.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
rules:
- pattern: ".*"
- pattern: ".*"
2 changes: 2 additions & 0 deletions documentation/compose/kafka-ui-arm64.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ services:
KAFKA_CLUSTERS_0_KAFKACONNECT_0_NAME: first
KAFKA_CLUSTERS_0_KAFKACONNECT_0_ADDRESS: http://kafka-connect0:8083
DYNAMIC_CONFIG_ENABLED: 'true' # not necessary, added for tests
KAFKA_CLUSTERS_0_AUDIT_TOPICAUDITENABLED: 'true'
KAFKA_CLUSTERS_0_AUDIT_CONSOLEAUDITENABLED: 'true'

kafka0:
image: confluentinc/cp-kafka:7.2.1.arm64
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static class Cluster {
List<Masking> masking;
Long pollingThrottleRate;
TruststoreConfig ssl;
AuditProperties audit;
}

@Data
Expand Down Expand Up @@ -143,6 +144,17 @@ public enum Type {
}
}

@Data
@NoArgsConstructor
@AllArgsConstructor
public static class AuditProperties {
String topic;
Integer auditTopicsPartitions;
Boolean topicAuditEnabled;
Boolean consoleAuditEnabled;
Map<String, String> auditTopicProperties;
}

@PostConstruct
public void validateAndSetDefaults() {
if (clusters != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.AclAction;
import com.provectus.kafka.ui.service.acl.AclsService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
Expand All @@ -26,19 +27,22 @@ public class AclsController extends AbstractController implements AclsApi {

private final AclsService aclsService;
private final AccessControlService accessControlService;
private final AuditService auditService;

@Override
public Mono<ResponseEntity<Void>> createAcl(String clusterName, Mono<KafkaAclDTO> kafkaAclDto,
ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("createAcl")
.build();

return accessControlService.validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.createAcl(getCluster(clusterName), binding))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -48,12 +52,14 @@ public Mono<ResponseEntity<Void>> deleteAcl(String clusterName, Mono<KafkaAclDTO
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("deleteAcl")
.build();

return accessControlService.validateAccess(context)
.then(kafkaAclDto)
.map(ClusterMapper::toAclBinding)
.flatMap(binding -> aclsService.deleteAcl(getCluster(clusterName), binding))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}

Expand All @@ -66,6 +72,7 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.VIEW)
.operationName("listAcls")
.build();

var resourceType = Optional.ofNullable(resourceTypeDto)
Expand All @@ -83,20 +90,22 @@ public Mono<ResponseEntity<Flux<KafkaAclDTO>>> listAcls(String clusterName,
ResponseEntity.ok(
aclsService.listAcls(getCluster(clusterName), filter)
.map(ClusterMapper::toKafkaAclDto)))
);
).doOnEach(sig -> auditService.audit(context, sig));
}

@Override
public Mono<ResponseEntity<String>> getAclAsCsv(String clusterName, ServerWebExchange exchange) {
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.VIEW)
.operationName("getAclAsCsv")
.build();

return accessControlService.validateAccess(context).then(
aclsService.getAclAsCsvString(getCluster(clusterName))
.map(ResponseEntity::ok)
.flatMap(Mono::just)
.doOnEach(sig -> auditService.audit(context, sig))
);
}

Expand All @@ -105,11 +114,13 @@ public Mono<ResponseEntity<Void>> syncAclsCsv(String clusterName, Mono<String> c
AccessContext context = AccessContext.builder()
.cluster(clusterName)
.aclActions(AclAction.EDIT)
.operationName("syncAclsCsv")
.build();

return accessControlService.validateAccess(context)
.then(csvMono)
.flatMap(csv -> aclsService.syncAclWithAclCsv(getCluster(clusterName), csv))
.doOnEach(sig -> auditService.audit(context, sig))
.thenReturn(ResponseEntity.ok().build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.service.ApplicationInfoService;
import com.provectus.kafka.ui.service.KafkaClusterFactory;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import com.provectus.kafka.ui.util.ApplicationRestarter;
import com.provectus.kafka.ui.util.DynamicConfigOperations;
Expand Down Expand Up @@ -55,6 +56,7 @@ interface PropertiesMapper {
private final ApplicationRestarter restarter;
private final KafkaClusterFactory kafkaClusterFactory;
private final ApplicationInfoService applicationInfoService;
private final AuditService auditService;

@Override
public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExchange exchange) {
Expand All @@ -63,62 +65,68 @@ public Mono<ResponseEntity<ApplicationInfoDTO>> getApplicationInfo(ServerWebExch

@Override
public Mono<ResponseEntity<ApplicationConfigDTO>> getCurrentConfig(ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(VIEW)
.build()
)
var context = AccessContext.builder()
.applicationConfigActions(VIEW)
.operationName("getCurrentConfig")
.build();
return accessControlService.validateAccess(context)
.then(Mono.fromSupplier(() -> ResponseEntity.ok(
new ApplicationConfigDTO()
.properties(MAPPER.toDto(dynamicConfigOperations.getCurrentProperties()))
)));
)))
.doOnEach(sig -> auditService.audit(context, sig));
}

@Override
public Mono<ResponseEntity<Void>> restartWithConfig(Mono<RestartRequestDTO> restartRequestDto,
ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(EDIT)
.build()
)
var context = AccessContext.builder()
.applicationConfigActions(EDIT)
.operationName("restartWithConfig")
.build();
return accessControlService.validateAccess(context)
.then(restartRequestDto)
.map(dto -> {
.<ResponseEntity<Void>>map(dto -> {
dynamicConfigOperations.persist(MAPPER.fromDto(dto.getConfig().getProperties()));
restarter.requestRestart();
return ResponseEntity.ok().build();
});
})
.doOnEach(sig -> auditService.audit(context, sig));
}

@Override
public Mono<ResponseEntity<UploadedFileInfoDTO>> uploadConfigRelatedFile(Flux<Part> fileFlux,
ServerWebExchange exchange) {
return accessControlService
.validateAccess(
AccessContext.builder()
.applicationConfigActions(EDIT)
.build()
)
var context = AccessContext.builder()
.applicationConfigActions(EDIT)
.operationName("uploadConfigRelatedFile")
.build();
return accessControlService.validateAccess(context)
.then(fileFlux.single())
.flatMap(file ->
dynamicConfigOperations.uploadConfigRelatedFile((FilePart) file)
.map(path -> new UploadedFileInfoDTO().location(path.toString()))
.map(ResponseEntity::ok));
.map(ResponseEntity::ok))
.doOnEach(sig -> auditService.audit(context, sig));
}

@Override
public Mono<ResponseEntity<ApplicationConfigValidationDTO>> validateConfig(Mono<ApplicationConfigDTO> configDto,
ServerWebExchange exchange) {
return configDto
var context = AccessContext.builder()
.applicationConfigActions(EDIT)
.operationName("validateConfig")
.build();
return accessControlService.validateAccess(context)
.then(configDto)
.flatMap(config -> {
PropertiesStructure propertiesStructure = MAPPER.fromDto(config.getProperties());
ClustersProperties clustersProperties = propertiesStructure.getKafka();
return validateClustersConfig(clustersProperties)
.map(validations -> new ApplicationConfigValidationDTO().clusters(validations));
})
.map(ResponseEntity::ok);
.map(ResponseEntity::ok)
.doOnEach(sig -> auditService.audit(context, sig));
}

private Mono<Map<String, ClusterConfigValidationDTO>> validateClustersConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.permission.ClusterConfigAction;
import com.provectus.kafka.ui.service.BrokerService;
import com.provectus.kafka.ui.service.audit.AuditService;
import com.provectus.kafka.ui.service.rbac.AccessControlService;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.ResponseEntity;
Expand All @@ -27,78 +30,97 @@
public class BrokersController extends AbstractController implements BrokersApi {
private final BrokerService brokerService;
private final ClusterMapper clusterMapper;

private final AuditService auditService;
private final AccessControlService accessControlService;

@Override
public Mono<ResponseEntity<Flux<BrokerDTO>>> getBrokers(String clusterName,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.build());
.operationName("getBrokers")
.build();

var job = brokerService.getBrokers(getCluster(clusterName)).map(clusterMapper::toBrokerDto);

return validateAccess.thenReturn(ResponseEntity.ok(job));
return accessControlService.validateAccess(context)
.thenReturn(ResponseEntity.ok(job))
.doOnEach(sig -> auditService.audit(context, sig));
}

@Override
public Mono<ResponseEntity<BrokerMetricsDTO>> getBrokersMetrics(String clusterName, Integer id,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.build());
.operationName("getBrokersMetrics")
.operationParams(Map.of("id", id))
.build();

return validateAccess.then(
brokerService.getBrokerMetrics(getCluster(clusterName), id)
.map(clusterMapper::toBrokerMetrics)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
);
return accessControlService.validateAccess(context)
.then(
brokerService.getBrokerMetrics(getCluster(clusterName), id)
.map(clusterMapper::toBrokerMetrics)
.map(ResponseEntity::ok)
.onErrorReturn(ResponseEntity.notFound().build())
)
.doOnEach(sig -> auditService.audit(context, sig));
}

@Override
public Mono<ResponseEntity<Flux<BrokersLogdirsDTO>>> getAllBrokersLogdirs(String clusterName,
List<Integer> brokers,
@Nullable List<Integer> brokers,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()

List<Integer> brokerIds = brokers == null ? List.of() : brokers;

var context = AccessContext.builder()
.cluster(clusterName)
.build());
.operationName("getAllBrokersLogdirs")
.operationParams(Map.of("brokerIds", brokerIds))
.build();

return validateAccess.thenReturn(ResponseEntity.ok(
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokers)));
return accessControlService.validateAccess(context)
.thenReturn(ResponseEntity.ok(
brokerService.getAllBrokersLogdirs(getCluster(clusterName), brokerIds)))
.doOnEach(sig -> auditService.audit(context, sig));
}

@Override
public Mono<ResponseEntity<Flux<BrokerConfigDTO>>> getBrokerConfig(String clusterName,
Integer id,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.clusterConfigActions(ClusterConfigAction.VIEW)
.build());
.operationName("getBrokerConfig")
.operationParams(Map.of("brokerId", id))
.build();

return validateAccess.thenReturn(
return accessControlService.validateAccess(context).thenReturn(
ResponseEntity.ok(
brokerService.getBrokerConfig(getCluster(clusterName), id)
.map(clusterMapper::toBrokerConfig))
);
).doOnEach(sig -> auditService.audit(context, sig));
}

@Override
public Mono<ResponseEntity<Void>> updateBrokerTopicPartitionLogDir(String clusterName,
Integer id,
Mono<BrokerLogdirUpdateDTO> brokerLogdir,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
.build());
.operationName("updateBrokerTopicPartitionLogDir")
.operationParams(Map.of("brokerId", id))
.build();

return validateAccess.then(
return accessControlService.validateAccess(context).then(
brokerLogdir
.flatMap(bld -> brokerService.updateBrokerLogDir(getCluster(clusterName), id, bld))
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}

@Override
Expand All @@ -107,16 +129,18 @@ public Mono<ResponseEntity<Void>> updateBrokerConfigByName(String clusterName,
String name,
Mono<BrokerConfigItemDTO> brokerConfig,
ServerWebExchange exchange) {
Mono<Void> validateAccess = accessControlService.validateAccess(AccessContext.builder()
var context = AccessContext.builder()
.cluster(clusterName)
.clusterConfigActions(ClusterConfigAction.VIEW, ClusterConfigAction.EDIT)
.build());
.operationName("updateBrokerConfigByName")
.operationParams(Map.of("brokerId", id))
.build();

return validateAccess.then(
return accessControlService.validateAccess(context).then(
brokerConfig
.flatMap(bci -> brokerService.updateBrokerConfigByName(
getCluster(clusterName), id, name, bci.getValue()))
.map(ResponseEntity::ok)
);
).doOnEach(sig -> auditService.audit(context, sig));
}
}
Loading