Skip to content

Add compatibility with GCP Schema Registries #1153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ public static class Cluster {
List<@Valid Masking> masking;

AuditProperties audit;

boolean gcpSchemaRegistry = false;

}

@Data
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@
import io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer;
import io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer;
import io.kafbat.ui.util.jsonschema.JsonAvroConversion;
import java.util.HashMap;
import java.util.Map;
import lombok.SneakyThrows;

interface MessageFormatter {

String format(String topic, byte[] value);

static Map<SchemaType, MessageFormatter> createMap(SchemaRegistryClient schemaRegistryClient) {
static Map<SchemaType, MessageFormatter> createMap(SchemaRegistryClient schemaRegistryClient,
boolean gcpSchemaRegistry) {
return Map.of(
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient),
SchemaType.AVRO, new AvroMessageFormatter(schemaRegistryClient, gcpSchemaRegistry),
SchemaType.JSON, new JsonSchemaMessageFormatter(schemaRegistryClient),
SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
);
Expand All @@ -29,17 +31,23 @@ SchemaType.PROTOBUF, new ProtobufMessageFormatter(schemaRegistryClient)
class AvroMessageFormatter implements MessageFormatter {
private final KafkaAvroDeserializer avroDeserializer;

AvroMessageFormatter(SchemaRegistryClient client) {
AvroMessageFormatter(SchemaRegistryClient client, boolean gcpSchemaRegistry) {
this.avroDeserializer = new KafkaAvroDeserializer(client);
this.avroDeserializer.configure(
Map.of(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused",
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false,
KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false,
KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true
),
false
);

final Map<String, Object> avroProps = new HashMap<>();
avroProps.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "wontbeused");
avroProps.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
avroProps.put(KafkaAvroDeserializerConfig.SCHEMA_REFLECTION_CONFIG, false);
avroProps.put(KafkaAvroDeserializerConfig.AVRO_USE_LOGICAL_TYPE_CONVERTERS_CONFIG, true);

if (gcpSchemaRegistry) {
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CREDENTIALS_SOURCE, "CUSTOM");
avroProps.put(KafkaAvroDeserializerConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS,
"class com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider");
}

this.avroDeserializer.configure(avroProps, false);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ public class SchemaRegistrySerde implements BuiltInSerde {

private static final byte SR_PAYLOAD_MAGIC_BYTE = 0x0;
private static final int SR_PAYLOAD_PREFIX_LENGTH = 5;
private static final String CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE = "CUSTOM";
private static final String GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS =
"com.google.cloud.hosted.kafka.auth.GcpBearerAuthCredentialProvider";

public static String name() {
return "SchemaRegistry";
Expand Down Expand Up @@ -80,8 +83,10 @@ public void autoConfigure(PropertyResolver kafkaClusterProperties,
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("schemaRegistrySsl.keystorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false)
),
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false),
kafkaClusterProperties.getProperty("schemaRegistryKeySchemaNameTemplate", String.class).orElse("%s-key"),
kafkaClusterProperties.getProperty("schemaRegistrySchemaNameTemplate", String.class).orElse("%s-value"),
kafkaClusterProperties.getProperty("schemaRegistryCheckSchemaExistenceForDeserialize", Boolean.class)
Expand All @@ -106,8 +111,10 @@ public void configure(PropertyResolver serdeProperties,
serdeProperties.getProperty("keystoreLocation", String.class).orElse(null),
serdeProperties.getProperty("keystorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststoreLocation", String.class).orElse(null),
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null)
kafkaClusterProperties.getProperty("ssl.truststorePassword", String.class).orElse(null),
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false)
),
kafkaClusterProperties.getProperty("gcpSchemaRegistry", Boolean.class).orElse(false),
serdeProperties.getProperty("keySchemaNameTemplate", String.class).orElse("%s-key"),
serdeProperties.getProperty("schemaNameTemplate", String.class).orElse("%s-value"),
serdeProperties.getProperty("checkSchemaExistenceForDeserialize", Boolean.class)
Expand All @@ -119,14 +126,15 @@ public void configure(PropertyResolver serdeProperties,
void configure(
List<String> schemaRegistryUrls,
SchemaRegistryClient schemaRegistryClient,
boolean gcpSchemaRegistry,
String keySchemaNameTemplate,
String valueSchemaNameTemplate,
boolean checkTopicSchemaExistenceForDeserialize) {
this.schemaRegistryUrls = schemaRegistryUrls;
this.schemaRegistryClient = schemaRegistryClient;
this.keySchemaNameTemplate = keySchemaNameTemplate;
this.valueSchemaNameTemplate = valueSchemaNameTemplate;
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient);
this.schemaRegistryFormatters = MessageFormatter.createMap(schemaRegistryClient, gcpSchemaRegistry);
this.checkSchemaExistenceForDeserialize = checkTopicSchemaExistenceForDeserialize;
}

Expand All @@ -136,7 +144,8 @@ private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls
@Nullable String keyStoreLocation,
@Nullable String keyStorePassword,
@Nullable String trustStoreLocation,
@Nullable String trustStorePassword) {
@Nullable String trustStorePassword,
boolean gcpSchemaRegistry) {
Map<String, String> configs = new HashMap<>();
if (username != null && password != null) {
configs.put(BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO");
Expand Down Expand Up @@ -166,6 +175,11 @@ private static SchemaRegistryClient createSchemaRegistryClient(List<String> urls
keyStorePassword);
}

if (gcpSchemaRegistry) {
configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CREDENTIALS_SOURCE, CUSTOM_BEARER_AUTH_CREDENTIALS_SOURCE);
configs.put(SchemaRegistryClientConfig.BEARER_AUTH_CUSTOM_PROVIDER_CLASS, GCP_BEARER_AUTH_CUSTOM_PROVIDER_CLASS);
}

return new CachedSchemaRegistryClient(
urls,
1_000,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,13 @@ private boolean schemaRegistryConfigured(ClustersProperties.Cluster clusterPrope
}

private ReactiveFailover<KafkaSrClientApi> schemaRegistryClient(ClustersProperties.Cluster clusterProperties) {

var auth = Optional.ofNullable(clusterProperties.getSchemaRegistryAuth())
.orElse(new ClustersProperties.SchemaRegistryAuth());
WebClient webClient = new WebClientConfigurator()
.configureSsl(clusterProperties.getSsl(), clusterProperties.getSchemaRegistrySsl())
.configureBasicAuth(auth.getUsername(), auth.getPassword())
.configureGcpBearerAuth(clusterProperties.isGcpSchemaRegistry())
.configureBufferSize(webClientMaxBuffSize)
.build();
return ReactiveFailover.create(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,21 @@ public Mono<Compatibility> getSchemaCompatibilityLevel(KafkaCluster cluster,
String schemaName) {
return api(cluster)
.mono(c -> c.getSubjectCompatibilityLevel(schemaName, true))
.map(CompatibilityConfig::getCompatibilityLevel)
.map(compatibilityConfig ->
cluster.getOriginalProperties().isGcpSchemaRegistry()
? compatibilityConfig.getCompatibility()
: compatibilityConfig.getCompatibilityLevel())
.onErrorResume(error -> Mono.empty());
}

public Mono<Compatibility> getGlobalSchemaCompatibilityLevel(KafkaCluster cluster) {
return api(cluster)
.mono(KafkaSrClientApi::getGlobalCompatibilityLevel)
.map(CompatibilityConfig::getCompatibilityLevel);
.map(compatibilityConfig ->
cluster.getOriginalProperties().isGcpSchemaRegistry()
? compatibilityConfig.getCompatibility()
: compatibilityConfig.getCompatibilityLevel()
);
}

private Mono<Compatibility> getSchemaCompatibilityInfoOrGlobal(KafkaCluster cluster,
Expand Down
39 changes: 39 additions & 0 deletions api/src/main/java/io/kafbat/ui/util/WebClientConfigurator.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.auth.oauth2.GoogleCredentials;
import io.kafbat.ui.config.ClustersProperties;
import io.kafbat.ui.exception.ValidationException;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import java.io.FileInputStream;
import java.io.IOException;
import java.security.KeyStore;
import java.time.Duration;
import java.util.Collections;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import javax.net.ssl.KeyManagerFactory;
Expand All @@ -24,9 +27,13 @@
import org.springframework.http.codec.json.Jackson2JsonEncoder;
import org.springframework.util.ResourceUtils;
import org.springframework.util.unit.DataSize;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;


public class WebClientConfigurator {

private final WebClient.Builder builder = WebClient.builder();
Expand All @@ -45,6 +52,38 @@ private static ObjectMapper defaultOM() {
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}

public WebClientConfigurator configureGcpBearerAuth(boolean enabled) {
if (enabled) {
System.out.println("Configuring GCP Bearer Auth");
builder.filter(createGcpBearerAuthFilter());
}
return this;
}

private ExchangeFilterFunction createGcpBearerAuthFilter() {
return (request, next) -> {
return Mono.fromCallable(() -> {
try {
// Get credentials using Application Default Credentials (from the GKE service account)
GoogleCredentials credentials = GoogleCredentials.getApplicationDefault()
.createScoped(Collections.singleton("https://www.googleapis.com/auth/cloud-platform"));

credentials.refreshIfExpired();
return credentials.getAccessToken().getTokenValue();
} catch (IOException e) {
throw new RuntimeException("Failed to get GCP access token", e);
}
})
.flatMap(token -> {
ClientRequest newRequest = ClientRequest.from(request)
// Add the Authorization header
.headers(headers -> headers.setBearerAuth(token))
.build();
return next.exchange(newRequest);
});
};
}

public WebClientConfigurator configureSsl(@Nullable ClustersProperties.TruststoreConfig truststoreConfig,
@Nullable ClustersProperties.KeystoreConfig keystoreConfig) {
if (truststoreConfig != null && !truststoreConfig.isVerifySsl()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class SchemaRegistrySerdeTest {
@BeforeEach
void init() {
serde = new SchemaRegistrySerde();
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true);
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true);
}

@ParameterizedTest
Expand Down Expand Up @@ -135,7 +135,7 @@ class SerdeWithDisabledSubjectExistenceCheck {

@BeforeEach
void init() {
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", false);
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", false);
}

@Test
Expand All @@ -151,7 +151,7 @@ class SerdeWithEnabledSubjectExistenceCheck {

@BeforeEach
void init() {
serde.configure(List.of("wontbeused"), registryClient, "%s-key", "%s-value", true);
serde.configure(List.of("wontbeused"), registryClient, false, "%s-key", "%s-value", true);
}

@Test
Expand Down
2 changes: 2 additions & 0 deletions contract/src/main/resources/swagger/kafbat-ui-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4341,6 +4341,8 @@ components:
type: string
keystorePassword:
type: string
gcpSchemaRegistry:
type: boolean
ksqldbServer:
type: string
ksqldbServerSsl:
Expand Down
9 changes: 7 additions & 2 deletions contract/src/main/resources/swagger/kafka-sr-api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,13 @@ components:
properties:
compatibilityLevel:
$ref: '#/components/schemas/Compatibility'
required:
- compatibilityLevel
# GCP Managed Kafka Schema registries specific fields
alias:
type: string
compatibility:
$ref: '#/components/schemas/Compatibility'
normalize:
type: boolean

CompatibilityLevelChange:
type: object
Expand Down
Loading