diff --git a/.scalafmt.conf b/.scalafmt.conf index 9f53f67..4b5c29e 100644 --- a/.scalafmt.conf +++ b/.scalafmt.conf @@ -1,2 +1,2 @@ -version = 3.5.1 +version = 3.5.3 runner.dialect = scala212 diff --git a/build.sbt b/build.sbt index 6177e39..3d1c2ee 100644 --- a/build.sbt +++ b/build.sbt @@ -1,10 +1,12 @@ +// scala-steward:off name := "kafkaquery" scalaVersion := "2.12.15" scalacOptions ++= Seq( "-deprecation", - "-encoding", "UTF-8", + "-encoding", + "UTF-8", "-feature", "-unchecked", "-language:higherKinds" @@ -14,44 +16,36 @@ enablePlugins(PackPlugin) packMain := Map(name.value -> "org.kafkaquery.CLI") packExtraClasspath := Map(name.value -> Seq("${PROG_HOME}/udf_dependencies/*")) -lazy val flinkVersion = "1.12.7" -lazy val kafkaVersion = "3.1.0" -lazy val log4jVersion = "2.17.2" -lazy val scalatestVersion = "3.2.11" +lazy val flinkVersion = "1.12.7" +lazy val kafkaVersion = "3.1.0" +lazy val log4jVersion = "2.17.2" +lazy val scalatestVersion = "3.2.11" libraryDependencies ++= Seq( - "org.apache.logging.log4j" % "log4j-api" % log4jVersion, - "org.apache.logging.log4j" % "log4j-core" % log4jVersion % Runtime, - "org.apache.logging.log4j" %% "log4j-api-scala" % "12.0", - - "org.apache.flink" %% "flink-scala" % flinkVersion, - "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, - "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, - "org.apache.flink" %% "flink-clients" % flinkVersion, - "org.apache.flink" % "flink-core" % flinkVersion classifier "tests", - - "org.apache.zookeeper" % "zookeeper" % "3.7.0", - "io.dropwizard.metrics" % "metrics-core" % "4.2.9" % Test, - - "org.scalactic" %% "scalactic" % scalatestVersion % Test, - "org.scalatest" %% "scalatest" % scalatestVersion % Test, - "org.mockito" %% "mockito-scala" % "1.17.5" % Test, - - "org.apache.kafka" % "kafka-clients" % kafkaVersion, - "io.github.embeddedkafka" %% "embedded-kafka" % kafkaVersion % Test, - - "org.apache.avro" % "avro" % "1.11.0", - - "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion, - "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion, - "org.apache.flink" % "flink-json" % flinkVersion, - - "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test, - "org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier "tests", - "org.apache.flink" %% "flink-streaming-java" % flinkVersion % Test classifier "tests", - - "com.github.scopt" %% "scopt" % "4.0.1", - "com.sksamuel.avro4s" %% "avro4s-core" % "4.0.12" % Test, + "org.apache.logging.log4j" % "log4j-api" % log4jVersion, + "org.apache.logging.log4j" % "log4j-core" % log4jVersion % Runtime, + "org.apache.logging.log4j" %% "log4j-api-scala" % "12.0", + "org.apache.flink" %% "flink-scala" % flinkVersion, + "org.apache.flink" %% "flink-streaming-scala" % flinkVersion, + "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, + "org.apache.flink" %% "flink-clients" % flinkVersion, + "org.apache.flink" % "flink-core" % flinkVersion classifier "tests", + "org.apache.zookeeper" % "zookeeper" % "3.7.0", + "io.dropwizard.metrics" % "metrics-core" % "4.2.9" % Test, + "org.scalactic" %% "scalactic" % scalatestVersion % Test, + "org.scalatest" %% "scalatest" % scalatestVersion % Test, + "org.mockito" %% "mockito-scala" % "1.17.5" % Test, + "org.apache.kafka" % "kafka-clients" % kafkaVersion, + "io.github.embeddedkafka" %% "embedded-kafka" % kafkaVersion % Test, + "org.apache.avro" % "avro" % "1.11.0", + "org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion, + "org.apache.flink" %% "flink-table-planner-blink" % flinkVersion, + "org.apache.flink" % "flink-json" % flinkVersion, + "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test, + "org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier "tests", + "org.apache.flink" %% "flink-streaming-java" % flinkVersion % Test classifier "tests", + "com.github.scopt" %% "scopt" % "4.0.1", + "com.sksamuel.avro4s" %% "avro4s-core" % "4.0.12" % Test ) // Fork all tasks diff --git a/project/plugins.sbt b/project/plugins.sbt index 92f467d..6ada399 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,4 @@ -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") -addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.14") +// scala-steward:off +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") +addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.14") diff --git a/src/test/scala/org/kafkaquery/commands/QueryCommandTest.scala b/src/test/scala/org/kafkaquery/commands/QueryCommandTest.scala index 2fac7ff..1112f9a 100644 --- a/src/test/scala/org/kafkaquery/commands/QueryCommandTest.scala +++ b/src/test/scala/org/kafkaquery/commands/QueryCommandTest.scala @@ -15,17 +15,20 @@ import org.mockito.MockitoSugar import org.scalatest.BeforeAndAfter import org.scalatest.funsuite.AnyFunSuite -class QueryCommandTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafka with MockitoSugar { +class QueryCommandTest + extends AnyFunSuite + with BeforeAndAfter + with EmbeddedKafka + with MockitoSugar { - test ("Query should run and produce expected results") { + test("Query should run and produce expected results") { CollectRowSink.values.clear() val tableName = "t1" val zkExposerMock = mock[ZookeeperSchemaExposer] doReturn(List("t1", "t2")).when(zkExposerMock).getAllChildren - val t1Schema = new Schema.Parser().parse( - """ + val t1Schema = new Schema.Parser().parse(""" |{ "type": "record", "name": "t1", "fields": [ { "name": "f1", "type": "string" }, |{ "name": "f2", "type": "int" } ], "rowtime": "false" } |""".stripMargin) @@ -42,7 +45,13 @@ class QueryCommandTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafk publishStringMessageToKafka(tableName, "error") val qc = new QueryCommand( - QueryConfig(timeout = 4, query = "select f1 from t1", startStrategy = EarliestQueryStart(), ignoreParseErr = false, timeoutFunc = () => ()), + QueryConfig( + timeout = 4, + query = "select f1 from t1", + startStrategy = EarliestQueryStart(), + ignoreParseErr = false, + timeoutFunc = () => () + ), zkExposerMock, s"localhost:${config.kafkaPort}" ) @@ -54,20 +63,21 @@ class QueryCommandTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafk case _: JobExecutionException => } - assertResult(util.Arrays.asList(Row.of("val1"), Row.of("val2")))(CollectRowSink.values) + assertResult(util.Arrays.asList(Row.of("val1"), Row.of("val2")))( + CollectRowSink.values + ) } } - test ("usage of a User-defined function") { + test("usage of a User-defined function") { CollectRowSink.values.clear() val tableName = "t1" val zkExposerMock = mock[ZookeeperSchemaExposer] doReturn(List("t1", "t2")).when(zkExposerMock).getAllChildren - val t1Schema = new Schema.Parser().parse( - """ + val t1Schema = new Schema.Parser().parse(""" |{ "type": "record", "name": "t1", "fields": [ { "name": "f1", "type": "string" }, |{ "name": "f2", "type": "int" } ], "rowtime": "false" } |""".stripMargin) @@ -85,19 +95,27 @@ class QueryCommandTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafk val udfName = "MyUDF.java" - new PrintWriter(udfName) {write( - """import org.apache.flink.table.functions.ScalarFunction; + new PrintWriter(udfName) { + write("""import org.apache.flink.table.functions.ScalarFunction; | |public class MyUDF extends ScalarFunction { | public String eval(String input) { | return input + " : udf invoked"; | } - |}""".stripMargin); close()} + |}""".stripMargin); close() + } val udfFile = new File(udfName) udfFile.deleteOnExit() val qc = new QueryCommand( - QueryConfig(timeout = 4, query = "select MyUDF(f1) from t1", startStrategy = EarliestQueryStart(), ignoreParseErr = false, timeoutFunc = () => (), userFunctions = new Parser().getClassNameList(List(udfFile))), + QueryConfig( + timeout = 4, + query = "select MyUDF(f1) from t1", + startStrategy = EarliestQueryStart(), + ignoreParseErr = false, + timeoutFunc = () => (), + userFunctions = new Parser().getClassNameList(List(udfFile)) + ), zkExposerMock, s"localhost:${config.kafkaPort}" ) @@ -109,7 +127,10 @@ class QueryCommandTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafk case _: JobExecutionException => } - assertResult(util.Arrays.asList(Row.of("val1 : udf invoked"), Row.of("val2 : udf invoked")))(CollectRowSink.values) + assertResult( + util.Arrays + .asList(Row.of("val1 : udf invoked"), Row.of("val2 : udf invoked")) + )(CollectRowSink.values) } } @@ -123,5 +144,6 @@ class CollectRowSink extends SinkFunction[Row] { } object CollectRowSink { - val values: util.List[Row] = Collections.synchronizedList(new util.ArrayList()) + val values: util.List[Row] = + Collections.synchronizedList(new util.ArrayList()) } diff --git a/src/test/scala/org/kafkaquery/parsers/ParserTest.scala b/src/test/scala/org/kafkaquery/parsers/ParserTest.scala index a3ffc6a..93d80bb 100644 --- a/src/test/scala/org/kafkaquery/parsers/ParserTest.scala +++ b/src/test/scala/org/kafkaquery/parsers/ParserTest.scala @@ -14,7 +14,9 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter { private val subjectName = "testSubject" private val subjectSchema = new Schema.Parser() - .parse("""{"type":"record","name":"testCC","fields":[{"name":"s","type":"string"}]}""") + .parse( + """{"type":"record","name":"testCC","fields":[{"name":"s","type":"string"}]}""" + ) private var parser: Parser = _ private var outStream: ByteArrayOutputStream = _ @@ -53,7 +55,9 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter { test("parseNothing") { withRunningKafkaOnFoundPort(config) { implicit config => - parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")) + parser.setSchemaExposer( + new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}") + ) assertThrows[RuntimeException] { parser.parse(null) } @@ -62,7 +66,9 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter { test("parseDefinedPlusParseEmpty") { withRunningKafkaOnFoundPort(config) { implicit config => - parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")) + parser.setSchemaExposer( + new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}") + ) parser.getSchemaExposer.put(subjectSchema, subjectName) parser.parse(Array("-t", subjectName)) @@ -72,10 +78,12 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter { test("printAllTopics") { withRunningKafkaOnFoundPort(config) { implicit config => - parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")) + parser.setSchemaExposer( + new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}") + ) parser.getSchemaExposer.put(subjectSchema, subjectName) - //check whether the TopicParser prints the same output after more than 1 call. + // check whether the TopicParser prints the same output after more than 1 call. Console.withOut(outStream)(parser.printTopics()) val res = new String(outStream.toByteArray) val otherOutStream = new java.io.ByteArrayOutputStream @@ -88,7 +96,12 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter { test("setKafkaAddress") { withRunningKafkaOnFoundPort(config) { implicit config => val kafkaAddress = "someAddress" - val parserConfig = parser.parseConfig(("--kafka " + kafkaAddress + " --zookeeper \"notworkingAddress\"").split(" ")).get + val parserConfig = parser + .parseConfig( + ("--kafka " + kafkaAddress + " --zookeeper \"notworkingAddress\"") + .split(" ") + ) + .get assert(parserConfig.kafkaAddress == kafkaAddress) } } @@ -96,23 +109,28 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter { test("setZooKeeperAddress") { withRunningKafkaOnFoundPort(config) { implicit config => val ZKAddress = "someOTherAddress" - val parserConfig = parser.parseConfig(("--zookeeper "+ ZKAddress).split(" ")).get + val parserConfig = + parser.parseConfig(("--zookeeper " + ZKAddress).split(" ")).get assert(parserConfig.zookeeperAddress == ZKAddress) } } - test("updateSchemaFromFile") { withRunningKafkaOnFoundPort(config) { implicit config => - val fileName = "schema" val zkAddress = s"localhost:${config.zooKeeperPort}" - val avroSchema = """{"type":"record","name":"Person","namespace":"org.codefeedr.plugins.repl.org.kafkaquery.parsers.Parser.updateSchema","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"city","type":"string"}]}""" - new PrintWriter(fileName) {write(avroSchema); close()} + val avroSchema = + """{"type":"record","name":"Person","namespace":"org.codefeedr.plugins.repl.org.kafkaquery.parsers.Parser.updateSchema","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"city","type":"string"}]}""" + new PrintWriter(fileName) { write(avroSchema); close() } - parser.parse(("--update-schema "+ subjectName +"=" + fileName+ " --zookeeper "+zkAddress).split(" ")) + parser.parse( + ("--update-schema " + subjectName + "=" + fileName + " --zookeeper " + zkAddress) + .split(" ") + ) - assert(parser.getSchemaExposer.get(subjectName).get.toString.equals(avroSchema)) + assert( + parser.getSchemaExposer.get(subjectName).get.toString.equals(avroSchema) + ) FileUtils.deleteQuietly(new File(fileName)) } @@ -120,7 +138,9 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter { test("updateSchemaParserFailure") { withRunningKafkaOnFoundPort(config) { implicit config => - parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")) + parser.setSchemaExposer( + new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}") + ) val avroSchema = """incorrect Avro Format""" Console.withErr(outStream) { parser.updateSchema(subjectName, avroSchema) @@ -133,7 +153,8 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter { test("checkConfigQuery") { val args: Seq[String] = Seq( - "-q", "select * from topic" + "-q", + "select * from topic" ) val parsed = parser.parseConfig(args) @@ -150,7 +171,9 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter { val topic = "World" withRunningKafkaOnFoundPort(config) { implicit config => - parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")) + parser.setSchemaExposer( + new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}") + ) parser.getSchemaExposer.put(npmTableSchema, topic) Console.withOut(outStream) { @@ -166,7 +189,9 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter { test("testFailToPrintEmptyTopicSchema") { withRunningKafkaOnFoundPort(config) { implicit config => - parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")) + parser.setSchemaExposer( + new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}") + ) Console.withErr(outStream) { diff --git a/src/test/scala/org/kafkaquery/transforms/JsonToAvroSchemaTest.scala b/src/test/scala/org/kafkaquery/transforms/JsonToAvroSchemaTest.scala index 57119aa..08f6262 100644 --- a/src/test/scala/org/kafkaquery/transforms/JsonToAvroSchemaTest.scala +++ b/src/test/scala/org/kafkaquery/transforms/JsonToAvroSchemaTest.scala @@ -4,9 +4,17 @@ import org.apache.avro.Schema import org.kafkaquery.util.{KafkaRecordRetriever, UserInputRetriever} import org.mockito.MockitoSugar import org.scalatest.funsuite.AnyFunSuite -import org.scalatest.prop.{TableDrivenPropertyChecks, TableFor1, TableFor2, TableFor3} +import org.scalatest.prop.{ + TableDrivenPropertyChecks, + TableFor1, + TableFor2, + TableFor3 +} -class JsonToAvroSchemaTest extends AnyFunSuite with TableDrivenPropertyChecks with MockitoSugar { +class JsonToAvroSchemaTest + extends AnyFunSuite + with TableDrivenPropertyChecks + with MockitoSugar { val topicName = "myTopic" @@ -227,8 +235,7 @@ class JsonToAvroSchemaTest extends AnyFunSuite with TableDrivenPropertyChecks wi ) ) - /** - * Parameterized good weather tests for all supported types. + /** Parameterized good weather tests for all supported types. */ forAll(testData) { (avroSchema: String, jsonSample: String) => assertResult(new Schema.Parser().parse(avroSchema)) { @@ -261,8 +268,7 @@ class JsonToAvroSchemaTest extends AnyFunSuite with TableDrivenPropertyChecks wi |""".stripMargin*/ ) - /** - * Parameterized bad weather tests. + /** Parameterized bad weather tests. */ forAll(exceptionalTestData) { jsonSample: String => assertThrows[IllegalArgumentException] { @@ -273,7 +279,6 @@ class JsonToAvroSchemaTest extends AnyFunSuite with TableDrivenPropertyChecks wi } } - val objectOrMapData: TableFor3[String, String, Char] = Table( ("AvroSchema", "JsonSample", "Object Or Map?"), @@ -421,8 +426,8 @@ class JsonToAvroSchemaTest extends AnyFunSuite with TableDrivenPropertyChecks wi 'm' ) ) - /** - * Tests for object - map ambiguity feature + + /** Tests for object - map ambiguity feature */ forAll(objectOrMapData) { (schema: String, jsonSample: String, input: Char) => assertResult(new Schema.Parser().parse(schema)) { diff --git a/src/test/scala/org/kafkaquery/transforms/QueryOutputTest.scala b/src/test/scala/org/kafkaquery/transforms/QueryOutputTest.scala index ca02806..ed93ea3 100644 --- a/src/test/scala/org/kafkaquery/transforms/QueryOutputTest.scala +++ b/src/test/scala/org/kafkaquery/transforms/QueryOutputTest.scala @@ -3,10 +3,18 @@ package org.kafkaquery.transforms import io.github.embeddedkafka.Codecs.stringDeserializer import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration -import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, createTypeInformation} +import org.apache.flink.streaming.api.scala.{ + DataStream, + StreamExecutionEnvironment, + createTypeInformation +} import org.apache.flink.test.util.MiniClusterWithClientResource import org.apache.flink.types.Row -import org.kafkaquery.parsers.Configurations.{ConsoleQueryOut, KafkaQueryOut, SocketQueryOut} +import org.kafkaquery.parsers.Configurations.{ + ConsoleQueryOut, + KafkaQueryOut, + SocketQueryOut +} import org.kafkaquery.sinks.SocketSink import org.mockito.MockitoSugar import org.scalatest.BeforeAndAfter @@ -15,12 +23,18 @@ import org.scalatest.funsuite.AnyFunSuite import java.io.{ByteArrayOutputStream, PrintStream} import java.net.Socket -class QueryOutputTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafka with MockitoSugar { +class QueryOutputTest + extends AnyFunSuite + with BeforeAndAfter + with EmbeddedKafka + with MockitoSugar { - val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() - .setNumberSlotsPerTaskManager(1) - .setNumberTaskManagers(1) - .build) + val flinkCluster = new MiniClusterWithClientResource( + new MiniClusterResourceConfiguration.Builder() + .setNumberSlotsPerTaskManager(1) + .setNumberTaskManagers(1) + .build + ) var env: StreamExecutionEnvironment = _ var ds: DataStream[Row] = _ @@ -67,10 +81,12 @@ class QueryOutputTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafka zooKeeperPort = 0 ) - withRunningKafkaOnFoundPort(config) { - implicit config => - - QueryOutput.selectOutput(ds, KafkaQueryOut(topic = topicName),"localhost:" + config.kafkaPort) + withRunningKafkaOnFoundPort(config) { implicit config => + QueryOutput.selectOutput( + ds, + KafkaQueryOut(topic = topicName), + "localhost:" + config.kafkaPort + ) env.execute() assertResult(consumeFirstMessageFrom(topicName))("val1") diff --git a/src/test/scala/org/kafkaquery/transforms/QuerySetupTest.scala b/src/test/scala/org/kafkaquery/transforms/QuerySetupTest.scala index fec8f8e..cb0fc4a 100644 --- a/src/test/scala/org/kafkaquery/transforms/QuerySetupTest.scala +++ b/src/test/scala/org/kafkaquery/transforms/QuerySetupTest.scala @@ -2,22 +2,31 @@ package org.kafkaquery.transforms import java.lang import org.apache.avro.Schema -import org.kafkaquery.parsers.Configurations.{EarliestQueryStart, LatestQueryStart, QueryStart} +import org.kafkaquery.parsers.Configurations.{ + EarliestQueryStart, + LatestQueryStart, + QueryStart +} import org.scalatest.BeforeAndAfter import org.scalatest.funsuite.AnyFunSuite import org.scalatest.prop.{TableDrivenPropertyChecks, TableFor1, TableFor2} -class QuerySetupTest extends AnyFunSuite with BeforeAndAfter with TableDrivenPropertyChecks { +class QuerySetupTest + extends AnyFunSuite + with BeforeAndAfter + with TableDrivenPropertyChecks { test("Topics should correctly be extracted from query") { - val res = QuerySetup.extractTopics("SELECT * from t1, t2, t3", List("t2", "t3", "t4")) + val res = QuerySetup.extractTopics( + "SELECT * from t1, t2, t3", + List("t2", "t3", "t4") + ) assert(res == List("t2", "t3")) } val testDataGetTableCreationCommand: TableFor1[QueryStart] = Table( "startStrategy", - EarliestQueryStart(), LatestQueryStart() ) @@ -34,15 +43,19 @@ class QuerySetupTest extends AnyFunSuite with BeforeAndAfter with TableDrivenPro "'json.timestamp-format.standard' = 'ISO-8601', 'json.ignore-parse-errors' = 'true', " + "'json.fail-on-missing-field' = 'false')" )( - QuerySetup.getTableCreationCommand(tableName, new java.lang.StringBuilder(tableFields), kafkaAddr, - startStrategy = startStrategy, ignoreParseErr = true) + QuerySetup.getTableCreationCommand( + tableName, + new java.lang.StringBuilder(tableFields), + kafkaAddr, + startStrategy = startStrategy, + ignoreParseErr = true + ) ) } val testDataGenerateTableSchema: TableFor2[String, String] = Table( ("schemaStr", "tableDesc"), - ( """ |{ @@ -85,13 +98,18 @@ class QuerySetupTest extends AnyFunSuite with BeforeAndAfter with TableDrivenPro "field field type, field TIMESTAMP(3), WATERMARK FOR field AS field - INTERVAL '0.001' SECOND" ) ) - forAll(testDataGenerateTableSchema) { (schemaStr: String, tableDesc: String) => - assertResult( - tableDesc - )( - QuerySetup.generateTableSchema(new Schema.Parser().parse(schemaStr), - (_, _) => ("field", new lang.StringBuilder("field type"))).toString - ) + forAll(testDataGenerateTableSchema) { + (schemaStr: String, tableDesc: String) => + assertResult( + tableDesc + )( + QuerySetup + .generateTableSchema( + new Schema.Parser().parse(schemaStr), + (_, _) => ("field", new lang.StringBuilder("field type")) + ) + .toString + ) } } diff --git a/src/test/scala/org/kafkaquery/transforms/SchemaConverterTest.scala b/src/test/scala/org/kafkaquery/transforms/SchemaConverterTest.scala index fd3341b..5a5d3fc 100644 --- a/src/test/scala/org/kafkaquery/transforms/SchemaConverterTest.scala +++ b/src/test/scala/org/kafkaquery/transforms/SchemaConverterTest.scala @@ -12,17 +12,27 @@ class SchemaConverterTest extends AnyFunSuite with TableDrivenPropertyChecks { case class Nested(someVal: Int, someOtherVal: String) - case class AllSupportedTypes(someString: String, someFloat: Float, someDouble: Double, someInt: Int, - someBoolean: Boolean, someLong: Long, someOptional: Option[String], someByte: ByteBuffer, - someMap: Map[String, Int], someArray: Array[Int], someList: List[Long], someNested: Nested, - someNestedList: List[Nested]) + case class AllSupportedTypes( + someString: String, + someFloat: Float, + someDouble: Double, + someInt: Int, + someBoolean: Boolean, + someLong: Long, + someOptional: Option[String], + someByte: ByteBuffer, + someMap: Map[String, Int], + someArray: Array[Int], + someList: List[Long], + someNested: Nested, + someNestedList: List[Nested] + ) val schema: Schema = AvroSchema[AllSupportedTypes] val testData: TableFor2[String, String] = Table( ("FieldName", "expectedType"), - ("someString", "STRING"), ("someFloat", "FLOAT"), ("someDouble", "DOUBLE"), @@ -38,9 +48,8 @@ class SchemaConverterTest extends AnyFunSuite with TableDrivenPropertyChecks { ("someNestedList", "ARRAY>") ) - /** - * Parameterized good weather tests for all supported types. - */ + /** Parameterized good weather tests for all supported types. + */ forAll(testData) { (name: String, t: String) => assertResult(("`" + name + "`", t)) { val res = getNestedSchema(name, schema.getField(name).schema()) @@ -48,8 +57,7 @@ class SchemaConverterTest extends AnyFunSuite with TableDrivenPropertyChecks { } } - /** - * Test unsupported types. + /** Test unsupported types. */ assertThrows[RuntimeException] { diff --git a/src/test/scala/org/kafkaquery/transforms/SimpleSchemaGeneratorTest.scala b/src/test/scala/org/kafkaquery/transforms/SimpleSchemaGeneratorTest.scala index 863859c..7e81252 100644 --- a/src/test/scala/org/kafkaquery/transforms/SimpleSchemaGeneratorTest.scala +++ b/src/test/scala/org/kafkaquery/transforms/SimpleSchemaGeneratorTest.scala @@ -227,7 +227,11 @@ class SimpleSchemaGeneratorTest | } | } ] |}""".stripMargin) - assert(SimpleSchemaGenerator.getSimpleSchema(schema).equals("""PyPiReleaseExt : RECORD + assert( + SimpleSchemaGenerator + .getSimpleSchema(schema) + .equals( + """PyPiReleaseExt : RECORD | title : STRING | link : STRING | description : STRING @@ -266,6 +270,9 @@ class SimpleSchemaGeneratorTest | last_serial : LONG | releases : ARRAY | urls : ARRAY - |""".stripMargin.replace("\r\n", "\n"))) + |""".stripMargin + .replace("\r\n", "\n") + ) + ) } } diff --git a/src/test/scala/org/kafkaquery/transforms/TimeOutFunctionTest.scala b/src/test/scala/org/kafkaquery/transforms/TimeOutFunctionTest.scala index 334f4ff..af8a768 100644 --- a/src/test/scala/org/kafkaquery/transforms/TimeOutFunctionTest.scala +++ b/src/test/scala/org/kafkaquery/transforms/TimeOutFunctionTest.scala @@ -16,13 +16,17 @@ class TimeOutFunctionTest extends AnyFunSuite with BeforeAndAfter { before { funcExecuted = false - timeOutFunction = new TimeOutFunction(timeoutValMs, () => { - funcExecuted = true - Unit - }) // 1 s timeout + timeOutFunction = new TimeOutFunction( + timeoutValMs, + () => { + funcExecuted = true + Unit + } + ) // 1 s timeout testHarness = new OneInputStreamOperatorTestHarness[Row, Unit]( - new ProcessOperator(timeOutFunction)) + new ProcessOperator(timeOutFunction) + ) testHarness.open() } diff --git a/src/test/scala/org/kafkaquery/util/KafkaRecordRetrieverTest.scala b/src/test/scala/org/kafkaquery/util/KafkaRecordRetrieverTest.scala index 37f023c..25be403 100644 --- a/src/test/scala/org/kafkaquery/util/KafkaRecordRetrieverTest.scala +++ b/src/test/scala/org/kafkaquery/util/KafkaRecordRetrieverTest.scala @@ -4,10 +4,11 @@ import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import org.scalatest.funsuite.AnyFunSuite -class KafkaRecordRetrieverTest extends AnyFunSuite -with BeforeAndAfter -with BeforeAndAfterAll -with EmbeddedKafka { +class KafkaRecordRetrieverTest + extends AnyFunSuite + with BeforeAndAfter + with BeforeAndAfterAll + with EmbeddedKafka { val topicName = "someTopic" val someTopicData: List[String] = List( @@ -18,7 +19,7 @@ with EmbeddedKafka { """{ "name": "title5", "retrieveDate": "2020-05-19T17:48:03.000Z" }""" ) - def init(givenConfig : EmbeddedKafkaConfig) : Unit = { + def init(givenConfig: EmbeddedKafkaConfig): Unit = { implicit val config = givenConfig for (i <- someTopicData.indices) { publishStringMessageToKafka(topicName, someTopicData(i)) @@ -30,15 +31,14 @@ with EmbeddedKafka { zooKeeperPort = 0 ) - - test("Normal retrieval") { withRunningKafkaOnFoundPort(config) { implicit config => init(config) - val retriever = new KafkaRecordRetriever(topicName, "localhost:"+config.kafkaPort) + val retriever = + new KafkaRecordRetriever(topicName, "localhost:" + config.kafkaPort) for (i <- 1 to someTopicData.length) { val msg = retriever.getNextRecord.getOrElse("") - assert(msg == someTopicData(someTopicData.length-i)) + assert(msg == someTopicData(someTopicData.length - i)) } } } @@ -46,10 +46,11 @@ with EmbeddedKafka { test("Retrieving more than available") { withRunningKafkaOnFoundPort(config) { implicit config => init(config) - val retriever = new KafkaRecordRetriever(topicName, "localhost:"+config.kafkaPort) + val retriever = + new KafkaRecordRetriever(topicName, "localhost:" + config.kafkaPort) for (i <- 1 to someTopicData.length) { val msg = retriever.getNextRecord.getOrElse("") - assert(msg == someTopicData(someTopicData.length-i)) + assert(msg == someTopicData(someTopicData.length - i)) } assert(retriever.getNextRecord.isEmpty) } @@ -59,14 +60,17 @@ with EmbeddedKafka { withRunningKafkaOnFoundPort(config) { implicit config => init(config) val maxRecords = 3 - val retriever = new KafkaRecordRetriever(topicName, "localhost:"+config.kafkaPort, maxRecords) + val retriever = new KafkaRecordRetriever( + topicName, + "localhost:" + config.kafkaPort, + maxRecords + ) for (i <- 1 to maxRecords) { val msg = retriever.getNextRecord.getOrElse("") - assert(msg == someTopicData(someTopicData.length-i)) + assert(msg == someTopicData(someTopicData.length - i)) } assert(retriever.getNextRecord.isEmpty) } } - } diff --git a/src/test/scala/org/kafkaquery/util/UserInputRetrieverTest.scala b/src/test/scala/org/kafkaquery/util/UserInputRetrieverTest.scala index d0f5b9f..dd64732 100644 --- a/src/test/scala/org/kafkaquery/util/UserInputRetrieverTest.scala +++ b/src/test/scala/org/kafkaquery/util/UserInputRetrieverTest.scala @@ -5,11 +5,11 @@ import org.scalatest.funsuite.AnyFunSuite class UserInputRetrieverTest extends AnyFunSuite with MockitoSugar { - test("Simple character retrieval") { val inputReaderMock = mock[UserInputRetriever.InputReadWrapper] when(inputReaderMock.readChar()) - .thenReturn('k').andThen('o') + .thenReturn('k') + .andThen('o') UserInputRetriever.reader = inputReaderMock diff --git a/src/test/scala/org/kafkaquery/util/ZookeeperSchemaExposerTest.scala b/src/test/scala/org/kafkaquery/util/ZookeeperSchemaExposerTest.scala index 8aff9ce..4f18890 100644 --- a/src/test/scala/org/kafkaquery/util/ZookeeperSchemaExposerTest.scala +++ b/src/test/scala/org/kafkaquery/util/ZookeeperSchemaExposerTest.scala @@ -23,7 +23,8 @@ import org.apache.avro.Schema import org.scalatest.funsuite.AnyFunSuite import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} -class ZookeeperSchemaExposerTest extends AnyFunSuite +class ZookeeperSchemaExposerTest + extends AnyFunSuite with BeforeAndAfter with BeforeAndAfterAll with EmbeddedKafka { @@ -66,7 +67,8 @@ class ZookeeperSchemaExposerTest extends AnyFunSuite val host = "localhost:2181" override def beforeAll(): Unit = { - implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(zooKeeperPort = 2181) + implicit val config: EmbeddedKafkaConfig = + EmbeddedKafkaConfig(zooKeeperPort = 2181) EmbeddedKafka.start() } @@ -91,42 +93,42 @@ class ZookeeperSchemaExposerTest extends AnyFunSuite } test("A simple schema should be correctly saved") { - //put the schema + // put the schema exposer.put(parsedSchema, subject) - //if I get the schema, it should be the same + // if I get the schema, it should be the same assert(exposer.get(subject).get == parsedSchema) assert(exposer.get(subject).get != differentParsedSchema) } test("A simple schema should be correctly overwritten") { - //ensure schema's are not the same + // ensure schema's are not the same assert(differentParsedSchema != parsedSchema) - //put the schema + // put the schema exposer.put(parsedSchema, subject) - //if I get the schema, it should be the same + // if I get the schema, it should be the same assert(exposer.get(subject).get == parsedSchema) - //put the different schema + // put the different schema exposer.put(differentParsedSchema, subject) - //if I get the schema, it should not be the same as the original + // if I get the schema, it should not be the same as the original assert(exposer.get(subject).get != parsedSchema) assert(exposer.get(subject).get == differentParsedSchema) } test("A simple schema should be correctly deleted") { - //put the schema + // put the schema assert(exposer.put(parsedSchema, subject)) - //it should be properly deleted + // it should be properly deleted assert(exposer.delete(subject)) } test("A simple schema cannot be deleted if it is not there") { - //it should be properly deleted + // it should be properly deleted assert(!exposer.delete(subject)) } @@ -139,14 +141,14 @@ class ZookeeperSchemaExposerTest extends AnyFunSuite } test("All schema's should be properly deleted") { - //put the schema + // put the schema assert(exposer.put(parsedSchema, subject)) exposer.deleteAll() assert(exposer.get(subject).isEmpty) } test("All schema's should be properly deleted even if called twice") { - //put the schema + // put the schema assert(exposer.put(parsedSchema, subject)) exposer.deleteAll() exposer.deleteAll()