Skip to content

Commit

Permalink
More code cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
jeroenvandisseldorp committed Jun 27, 2024
1 parent c88019b commit 6912d44
Show file tree
Hide file tree
Showing 24 changed files with 1,088 additions and 428 deletions.
1,290 changes: 998 additions & 292 deletions docs/specification.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,6 @@
* =========================LICENSE_END==================================
*/

import org.apache.avro.Schema;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.axual.ksml.data.exception.DataException;
import io.axual.ksml.data.exception.ExecutionException;
import io.axual.ksml.data.mapper.NativeDataObjectMapper;
Expand All @@ -45,6 +34,16 @@
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import lombok.Getter;
import org.apache.avro.Schema;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* An AVRO {@link Notation} which does not need a running schema registry.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@
* =========================LICENSE_END==================================
*/

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaMetadata;
Expand All @@ -37,6 +31,12 @@
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaResponse;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class SyncMockSchemaRegistryClient implements SchemaRegistryClient {
@Override
public synchronized Optional<ParsedSchema> parseSchema(String schemaType, String schemaString, List<SchemaReference> references) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import io.axual.ksml.data.schema.UnionSchema;
import lombok.Getter;

import java.util.List;
import java.util.Map;

@Getter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
* =========================LICENSE_END==================================
*/

import io.axual.ksml.data.notation.UserType;
import io.axual.ksml.data.schema.DataSchema;
import io.axual.ksml.data.schema.StructSchema;
import io.axual.ksml.data.schema.UnionSchema;

import java.util.List;
import java.util.function.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
* =========================LICENSE_END==================================
*/

import org.apache.kafka.common.serialization.Serializer;

import java.util.List;

import io.axual.ksml.data.exception.ExecutionException;
import io.axual.ksml.data.mapper.DataSchemaMapper;
import io.axual.ksml.data.object.DataNull;
Expand All @@ -33,6 +29,9 @@
import io.axual.ksml.data.schema.StructSchema;
import io.axual.ksml.data.type.DataType;
import lombok.Getter;
import org.apache.kafka.common.serialization.Serializer;

import java.util.List;

import static io.axual.ksml.data.parser.schema.DataSchemaDSL.DATA_OBJECT_TYPE_NAME;
import static io.axual.ksml.data.parser.schema.DataSchemaDSL.DATA_SCHEMA_NAMESPACE;
Expand Down
11 changes: 5 additions & 6 deletions ksml-data/src/main/java/io/axual/ksml/data/serde/UnionSerde.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
* =========================LICENSE_END==================================
*/

import io.axual.ksml.data.exception.ExecutionException;
import io.axual.ksml.data.object.DataNull;
import io.axual.ksml.data.object.DataObject;
import io.axual.ksml.data.type.DataType;
import io.axual.ksml.data.type.UnionType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
Expand All @@ -28,12 +33,6 @@
import java.util.List;
import java.util.Map;

import io.axual.ksml.data.exception.ExecutionException;
import io.axual.ksml.data.object.DataNull;
import io.axual.ksml.data.object.DataObject;
import io.axual.ksml.data.type.DataType;
import io.axual.ksml.data.type.UnionType;

public class UnionSerde implements Serde<Object> {
private record PossibleType(DataType type, Serializer<Object> serializer,
Deserializer<Object> deserializer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@
*/

import lombok.EqualsAndHashCode;
import org.apache.kafka.common.utils.Utils;

import java.util.ArrayList;
import java.util.List;

@EqualsAndHashCode
public class ContextTags extends ArrayList<ContextTag> {
Expand Down
28 changes: 13 additions & 15 deletions ksml-runner/src/main/java/io/axual/ksml/runner/KSMLRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,6 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.state.HostInfo;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import io.axual.ksml.client.serde.ResolvingDeserializer;
import io.axual.ksml.client.serde.ResolvingSerializer;
import io.axual.ksml.data.mapper.DataTypeSchemaMapper;
Expand Down Expand Up @@ -76,6 +61,19 @@
import io.axual.ksml.runner.exception.ConfigException;
import io.axual.ksml.runner.prometheus.PrometheusExport;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.state.HostInfo;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

@Slf4j
public class KSMLRunner {
Expand Down
13 changes: 4 additions & 9 deletions ksml-runner/src/main/java/io/axual/ksml/runner/KsmlInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,15 @@
* =========================LICENSE_END==================================
*/

import io.axual.ksml.metric.KSMLMetrics;
import lombok.extern.slf4j.Slf4j;

import javax.management.*;
import java.io.IOException;
import java.io.InputStream;
import java.lang.management.ManagementFactory;
import java.util.Properties;

import javax.management.InstanceAlreadyExistsException;
import javax.management.MBeanRegistrationException;
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;

import io.axual.ksml.metric.KSMLMetrics;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class KsmlInfo {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,6 @@
* =========================LICENSE_END==================================
*/

import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import io.axual.ksml.client.serde.ResolvingSerializer;
import io.axual.ksml.data.mapper.DataObjectConverter;
import io.axual.ksml.data.mapper.NativeDataObjectMapper;
Expand All @@ -46,6 +38,13 @@
import io.axual.ksml.user.UserPredicate;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static io.axual.ksml.data.notation.UserType.DEFAULT_NOTATION;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
* =========================LICENSE_END==================================
*/

import io.axual.ksml.client.producer.ResolvingProducer;
import io.axual.ksml.generator.TopologyDefinition;
import io.axual.ksml.python.PythonContext;
import io.axual.ksml.python.PythonFunction;
import lombok.Builder;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
Expand All @@ -29,16 +34,8 @@
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;

import io.axual.ksml.client.producer.ResolvingProducer;
import io.axual.ksml.generator.TopologyDefinition;
import io.axual.ksml.python.PythonContext;
import io.axual.ksml.python.PythonFunction;
import lombok.Builder;

import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.APPLICATION_ID_CONFIG;

public class KafkaProducerRunner implements Runner {
private static final Logger log = LoggerFactory.getLogger(KafkaProducerRunner.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,6 @@
*/


import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyQueryMetadata;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetadata;
import org.apache.kafka.streams.TopologyConfig;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

import io.axual.ksml.TopologyGenerator;
import io.axual.ksml.client.generic.ResolvingClientConfig;
import io.axual.ksml.client.producer.ResolvingProducerConfig;
Expand All @@ -48,8 +32,15 @@
import io.axual.ksml.runner.streams.KSMLClientSupplier;
import lombok.Builder;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.*;

import static org.apache.kafka.clients.producer.ProducerConfig.CLIENT_ID_CONFIG;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
public class KafkaStreamsRunner implements Runner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@


import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;

import io.axual.ksml.data.notation.binary.JsonNodeNativeMapper;
import io.axual.ksml.generator.YAMLObjectMapper;
import io.axual.ksml.runner.exception.ConfigException;
Expand All @@ -39,6 +32,12 @@
import lombok.extern.jackson.Jacksonized;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.Map;

@Slf4j
@JsonIgnoreProperties(ignoreUnknown = true)
@Builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,17 @@
*/


import com.fasterxml.jackson.annotation.JsonAlias;
import com.fasterxml.jackson.annotation.JsonAnyGetter;
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.*;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;

import java.util.HashMap;
import java.util.Map;

import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;
import lombok.extern.slf4j.Slf4j;

import java.util.HashMap;
import java.util.Map;

@Slf4j
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,16 @@


import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;

import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@JsonIgnoreProperties(ignoreUnknown = true)
@Data
Expand Down
Loading

0 comments on commit 6912d44

Please sign in to comment.