diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 23be86369..733e66cb7 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -70,6 +70,9 @@ public static class Cluster { List<@Valid Masking> masking; AuditProperties audit; + + boolean gcpSchemaRegistry = false; + } @Data diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java index 5ab77ffeb..6d6d0eecd 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/MessageFormatter.java @@ -11,6 +11,7 @@ import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer; import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer; import io.kafbat.ui.util.jsonschema.JsonAvroConversion; +import java.util.HashMap; import java.util.Map; import lombok.SneakyThrows; @@ -18,9 +19,10 @@ interface MessageFormatter { String format(String topic, byte[] value); - static Map createMap(SchemaRegistryClient schemaRegistryClient) { + static Map createMap(SchemaRegistryClient schemaRegistryClient, + boolean gcpSchemaRegistry) { return Map.of( - SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient), + SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, gcpSchemaRegistry), SchemaType.JSON, new JsonSchemaMessageFormatter(schemaRegistryClient), SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient) ); @@ -29,17 +31,23 @@ SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient) class AvroMessageFormatter implements MessageFormatter { private final KafkaAvroDeserializer avroDeserializer; - AvroMessageFormatter(SchemaRegistryClient client) { + AvroMessageFormatter(SchemaRegistryClient client, boolean gcpSchemaRegistry) { this.avroDeserializer = new KafkaAvroDeserializer(client); - this.avroDeserializer.configure( - Map.of( - AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused", - KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false, - KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false, - KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true - ), - false - ); + + final Map avroProps = new HashMap<>(); + avroProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused"); + avroProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); + avroProps.put(KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false); + avroProps.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true); + + if (gcpSchemaRegistry) { + avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM"); + avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, + "class com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"); + } + + this.avroDeserializer.configure(avroProps, false); + } @Override diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java index d6f7a3699..9357848c0 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerde.java @@ -42,6 +42,9 @@ public class SchemaRegistrySerde implements BuiltInSerde { private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0; private static final int SR_PAYLOAD_PREFIX_LENGTH = 5; + private static final String CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE = "CUSTOM"; + private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS = + "com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider"; public static String name() { return "SchemaRegistry"; @@ -80,8 +83,10 @@ public void autoConfigure(PropertyResolver kafkaClusterProperties, kafkaClusterProperties.getProperty("schemaRegistrySsl.keystoreLocation", String.class).orElse(null), kafkaClusterProperties.getProperty("schemaRegistrySsl.keystorePassword", String.class).orElse(null), kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null), - kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null) + kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null), + kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false) ), + kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false), kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"), kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value"), kafkaClusterProperties.getProperty("schemaRegistryCheckSchemaExistenceForDeserialize", Boolean.class) @@ -106,8 +111,10 @@ public void configure(PropertyResolver serdeProperties, serdeProperties.getProperty("keystoreLocation", String.class).orElse(null), serdeProperties.getProperty("keystorePassword", String.class).orElse(null), kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null), - kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null) + kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null), + kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false) ), + kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false), serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"), serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value"), serdeProperties.getProperty("checkSchemaExistenceForDeserialize", Boolean.class) @@ -119,6 +126,7 @@ public void configure(PropertyResolver serdeProperties, void configure( List schemaRegistryUrls, SchemaRegistryClient schemaRegistryClient, + boolean gcpSchemaRegistry, String keySchemaNameTemplate, String valueSchemaNameTemplate, boolean checkTopicSchemaExistenceForDeserialize) { @@ -126,7 +134,7 @@ void configure( this.schemaRegistryClient = schemaRegistryClient; this.keySchemaNameTemplate = keySchemaNameTemplate; this.valueSchemaNameTemplate = valueSchemaNameTemplate; - this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient); + this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, gcpSchemaRegistry); this.checkSchemaExistenceForDeserialize = checkTopicSchemaExistenceForDeserialize; } @@ -136,7 +144,8 @@ private static SchemaRegistryClient createSchemaRegistryClient(List urls @Nullable String keyStoreLocation, @Nullable String keyStorePassword, @Nullable String trustStoreLocation, - @Nullable String trustStorePassword) { + @Nullable String trustStorePassword, + boolean gcpSchemaRegistry) { Map configs = new HashMap<>(); if (username != null && password != null) { configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); @@ -166,6 +175,11 @@ private static SchemaRegistryClient createSchemaRegistryClient(List urls keyStorePassword); } + if (gcpSchemaRegistry) { + configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CREDENTIALS_SOURCE, CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE); + configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS); + } + return new CachedSchemaRegistryClient( urls, 1_000, diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java index f8c528f90..f764a079f 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java @@ -167,11 +167,13 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope } private ReactiveFailover schemaRegistryClient(ClustersProperties.Cluster clusterProperties) { + var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth()) .orElse(new ClustersProperties.SchemaRegistryAuth()); WebClient webClient = new WebClientConfigurator() .configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl()) .configureBasicAuth(auth.getUsername(), auth.getPassword()) + .configureGcpBearerAuth(clusterProperties.isGcpSchemaRegistry()) .configureBufferSize(webClientMaxBuffSize) .build(); return ReactiveFailover.create( diff --git a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java index c725a787e..670bee546 100644 --- a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java +++ b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java @@ -148,14 +148,21 @@ public Mono getSchemaCompatibilityLevel(KafkaCluster cluster, String schemaName) { return api(cluster) .mono(c -> c.getSubjectCompatibilityLevel(schemaName, true)) - .map(CompatibilityConfig::getCompatibilityLevel) + .map(compatibilityConfig -> + cluster.getOriginalProperties().isGcpSchemaRegistry() + ? compatibilityConfig.getCompatibility() + : compatibilityConfig.getCompatibilityLevel()) .onErrorResume(error -> Mono.empty()); } public Mono getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) { return api(cluster) .mono(KafkaSrClientApi::getGlobalCompatibilityLevel) - .map(CompatibilityConfig::getCompatibilityLevel); + .map(compatibilityConfig -> + cluster.getOriginalProperties().isGcpSchemaRegistry() + ? compatibilityConfig.getCompatibility() + : compatibilityConfig.getCompatibilityLevel() + ); } private Mono getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster, diff --git a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java index 170530be1..c989e0539 100644 --- a/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java +++ b/api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java @@ -3,14 +3,17 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.auth.oauth2.GoogleCredentials; import io.kafbat.ui.config.ClustersProperties; import io.kafbat.ui.exception.ValidationException; import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import java.io.FileInputStream; +import java.io.IOException; import java.security.KeyStore; import java.time.Duration; +import java.util.Collections; import java.util.function.Consumer; import javax.annotation.Nullable; import javax.net.ssl.KeyManagerFactory; @@ -24,9 +27,13 @@ import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.util.ResourceUtils; import org.springframework.util.unit.DataSize; +import org.springframework.web.reactive.function.client.ClientRequest; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; + public class WebClientConfigurator { private final WebClient.Builder builder = WebClient.builder(); @@ -45,6 +52,38 @@ private static ObjectMapper defaultOM() { .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } + public WebClientConfigurator configureGcpBearerAuth(boolean enabled) { + if (enabled) { + System.out.println("Configuring GCP Bearer Auth"); + builder.filter(createGcpBearerAuthFilter()); + } + return this; + } + + private ExchangeFilterFunction createGcpBearerAuthFilter() { + return (request, next) -> { + return Mono.fromCallable(() -> { + try { + // Get credentials using Application Default Credentials (from the GKE service account) + GoogleCredentials credentials = GoogleCredentials.getApplicationDefault() + .createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform")); + + credentials.refreshIfExpired(); + return credentials.getAccessToken().getTokenValue(); + } catch (IOException e) { + throw new RuntimeException("Failed to get GCP access token", e); + } + }) + .flatMap(token -> { + ClientRequest newRequest = ClientRequest.from(request) + // Add the Authorization header + .headers(headers -> headers.setBearerAuth(token)) + .build(); + return next.exchange(newRequest); + }); + }; + } + public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig, @Nullable ClustersProperties.KeystoreConfig keystoreConfig) { if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) { diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java index d66a8d004..7fe37022d 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/sr/SchemaRegistrySerdeTest.java @@ -35,7 +35,7 @@ class SchemaRegistrySerdeTest { @BeforeEach void init() { serde = new SchemaRegistrySerde(); - serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true); + serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true); } @ParameterizedTest @@ -135,7 +135,7 @@ class SerdeWithDisabledSubjectExistenceCheck { @BeforeEach void init() { - serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", false); + serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", false); } @Test @@ -151,7 +151,7 @@ class SerdeWithEnabledSubjectExistenceCheck { @BeforeEach void init() { - serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true); + serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true); } @Test diff --git a/contract/src/main/resources/swagger/kafbat-ui-api.yaml b/contract/src/main/resources/swagger/kafbat-ui-api.yaml index 8769e6aa1..bb2c8ea97 100644 --- a/contract/src/main/resources/swagger/kafbat-ui-api.yaml +++ b/contract/src/main/resources/swagger/kafbat-ui-api.yaml @@ -4341,6 +4341,8 @@ components: type: string keystorePassword: type: string + gcpSchemaRegistry: + type: boolean ksqldbServer: type: string ksqldbServerSsl: diff --git a/contract/src/main/resources/swagger/kafka-sr-api.yaml b/contract/src/main/resources/swagger/kafka-sr-api.yaml index 2b082d689..b025b3050 100644 --- a/contract/src/main/resources/swagger/kafka-sr-api.yaml +++ b/contract/src/main/resources/swagger/kafka-sr-api.yaml @@ -381,8 +381,13 @@ components: properties: compatibilityLevel: $ref: '#/components/schemas/Compatibility' - required: - - compatibilityLevel + # GCP Managed Kafka Schema registries specific fields + alias: + type: string + compatibility: + $ref: '#/components/schemas/Compatibility' + normalize: + type: boolean CompatibilityLevelChange: type: object