Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version = 3.5.1
version = 3.5.3
runner.dialect = scala212
68 changes: 31 additions & 37 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// scala-steward:off
name := "kafkaquery"

scalaVersion := "2.12.15"

scalacOptions ++= Seq(
"-deprecation",
"-encoding", "UTF-8",
"-encoding",
"UTF-8",
"-feature",
"-unchecked",
"-language:higherKinds"
Expand All @@ -14,44 +16,36 @@ enablePlugins(PackPlugin)
packMain := Map(name.value -> "org.kafkaquery.CLI")
packExtraClasspath := Map(name.value -> Seq("${PROG_HOME}/udf_dependencies/*"))

lazy val flinkVersion = "1.12.7"
lazy val kafkaVersion = "3.1.0"
lazy val log4jVersion = "2.17.2"
lazy val scalatestVersion = "3.2.11"
lazy val flinkVersion = "1.12.7"
lazy val kafkaVersion = "3.1.0"
lazy val log4jVersion = "2.17.2"
lazy val scalatestVersion = "3.2.11"

libraryDependencies ++= Seq(
"org.apache.logging.log4j" % "log4j-api" % log4jVersion,
"org.apache.logging.log4j" % "log4j-core" % log4jVersion % Runtime,
"org.apache.logging.log4j" %% "log4j-api-scala" % "12.0",

"org.apache.flink" %% "flink-scala" % flinkVersion,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
"org.apache.flink" %% "flink-clients" % flinkVersion,
"org.apache.flink" % "flink-core" % flinkVersion classifier "tests",

"org.apache.zookeeper" % "zookeeper" % "3.7.0",
"io.dropwizard.metrics" % "metrics-core" % "4.2.9" % Test,

"org.scalactic" %% "scalactic" % scalatestVersion % Test,
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"org.mockito" %% "mockito-scala" % "1.17.5" % Test,

"org.apache.kafka" % "kafka-clients" % kafkaVersion,
"io.github.embeddedkafka" %% "embedded-kafka" % kafkaVersion % Test,

"org.apache.avro" % "avro" % "1.11.0",

"org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion,
"org.apache.flink" %% "flink-table-planner-blink" % flinkVersion,
"org.apache.flink" % "flink-json" % flinkVersion,

"org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier "tests",
"org.apache.flink" %% "flink-streaming-java" % flinkVersion % Test classifier "tests",

"com.github.scopt" %% "scopt" % "4.0.1",
"com.sksamuel.avro4s" %% "avro4s-core" % "4.0.12" % Test,
"org.apache.logging.log4j" % "log4j-api" % log4jVersion,
"org.apache.logging.log4j" % "log4j-core" % log4jVersion % Runtime,
"org.apache.logging.log4j" %% "log4j-api-scala" % "12.0",
"org.apache.flink" %% "flink-scala" % flinkVersion,
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
"org.apache.flink" %% "flink-clients" % flinkVersion,
"org.apache.flink" % "flink-core" % flinkVersion classifier "tests",
"org.apache.zookeeper" % "zookeeper" % "3.7.0",
"io.dropwizard.metrics" % "metrics-core" % "4.2.9" % Test,
"org.scalactic" %% "scalactic" % scalatestVersion % Test,
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"org.mockito" %% "mockito-scala" % "1.17.5" % Test,
"org.apache.kafka" % "kafka-clients" % kafkaVersion,
"io.github.embeddedkafka" %% "embedded-kafka" % kafkaVersion % Test,
"org.apache.avro" % "avro" % "1.11.0",
"org.apache.flink" %% "flink-table-api-scala-bridge" % flinkVersion,
"org.apache.flink" %% "flink-table-planner-blink" % flinkVersion,
"org.apache.flink" % "flink-json" % flinkVersion,
"org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
"org.apache.flink" %% "flink-runtime" % flinkVersion % Test classifier "tests",
"org.apache.flink" %% "flink-streaming-java" % flinkVersion % Test classifier "tests",
"com.github.scopt" %% "scopt" % "4.0.1",
"com.sksamuel.avro4s" %% "avro4s-core" % "4.0.12" % Test
)

// Fork all tasks
Expand Down
7 changes: 4 additions & 3 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.14")
// scala-steward:off
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6")
addSbtPlugin("org.xerial.sbt" % "sbt-pack" % "0.14")
52 changes: 37 additions & 15 deletions src/test/scala/org/kafkaquery/commands/QueryCommandTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@ import org.mockito.MockitoSugar
import org.scalatest.BeforeAndAfter
import org.scalatest.funsuite.AnyFunSuite

class QueryCommandTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafka with MockitoSugar {
class QueryCommandTest
extends AnyFunSuite
with BeforeAndAfter
with EmbeddedKafka
with MockitoSugar {

test ("Query should run and produce expected results") {
test("Query should run and produce expected results") {
CollectRowSink.values.clear()

val tableName = "t1"

val zkExposerMock = mock[ZookeeperSchemaExposer]
doReturn(List("t1", "t2")).when(zkExposerMock).getAllChildren
val t1Schema = new Schema.Parser().parse(
"""
val t1Schema = new Schema.Parser().parse("""
|{ "type": "record", "name": "t1", "fields": [ { "name": "f1", "type": "string" },
|{ "name": "f2", "type": "int" } ], "rowtime": "false" }
|""".stripMargin)
Expand All @@ -42,7 +45,13 @@ class QueryCommandTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafk
publishStringMessageToKafka(tableName, "error")

val qc = new QueryCommand(
QueryConfig(timeout = 4, query = "select f1 from t1", startStrategy = EarliestQueryStart(), ignoreParseErr = false, timeoutFunc = () => ()),
QueryConfig(
timeout = 4,
query = "select f1 from t1",
startStrategy = EarliestQueryStart(),
ignoreParseErr = false,
timeoutFunc = () => ()
),
zkExposerMock,
s"localhost:${config.kafkaPort}"
)
Expand All @@ -54,20 +63,21 @@ class QueryCommandTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafk
case _: JobExecutionException =>
}

assertResult(util.Arrays.asList(Row.of("val1"), Row.of("val2")))(CollectRowSink.values)
assertResult(util.Arrays.asList(Row.of("val1"), Row.of("val2")))(
CollectRowSink.values
)
}

}

test ("usage of a User-defined function") {
test("usage of a User-defined function") {
CollectRowSink.values.clear()

val tableName = "t1"

val zkExposerMock = mock[ZookeeperSchemaExposer]
doReturn(List("t1", "t2")).when(zkExposerMock).getAllChildren
val t1Schema = new Schema.Parser().parse(
"""
val t1Schema = new Schema.Parser().parse("""
|{ "type": "record", "name": "t1", "fields": [ { "name": "f1", "type": "string" },
|{ "name": "f2", "type": "int" } ], "rowtime": "false" }
|""".stripMargin)
Expand All @@ -85,19 +95,27 @@ class QueryCommandTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafk

val udfName = "MyUDF.java"

new PrintWriter(udfName) {write(
"""import org.apache.flink.table.functions.ScalarFunction;
new PrintWriter(udfName) {
write("""import org.apache.flink.table.functions.ScalarFunction;
|
|public class MyUDF extends ScalarFunction {
| public String eval(String input) {
| return input + " : udf invoked";
| }
|}""".stripMargin); close()}
|}""".stripMargin); close()
}

val udfFile = new File(udfName)
udfFile.deleteOnExit()
val qc = new QueryCommand(
QueryConfig(timeout = 4, query = "select MyUDF(f1) from t1", startStrategy = EarliestQueryStart(), ignoreParseErr = false, timeoutFunc = () => (), userFunctions = new Parser().getClassNameList(List(udfFile))),
QueryConfig(
timeout = 4,
query = "select MyUDF(f1) from t1",
startStrategy = EarliestQueryStart(),
ignoreParseErr = false,
timeoutFunc = () => (),
userFunctions = new Parser().getClassNameList(List(udfFile))
),
zkExposerMock,
s"localhost:${config.kafkaPort}"
)
Expand All @@ -109,7 +127,10 @@ class QueryCommandTest extends AnyFunSuite with BeforeAndAfter with EmbeddedKafk
case _: JobExecutionException =>
}

assertResult(util.Arrays.asList(Row.of("val1 : udf invoked"), Row.of("val2 : udf invoked")))(CollectRowSink.values)
assertResult(
util.Arrays
.asList(Row.of("val1 : udf invoked"), Row.of("val2 : udf invoked"))
)(CollectRowSink.values)
}

}
Expand All @@ -123,5 +144,6 @@ class CollectRowSink extends SinkFunction[Row] {
}

object CollectRowSink {
val values: util.List[Row] = Collections.synchronizedList(new util.ArrayList())
val values: util.List[Row] =
Collections.synchronizedList(new util.ArrayList())
}
59 changes: 42 additions & 17 deletions src/test/scala/org/kafkaquery/parsers/ParserTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter {

private val subjectName = "testSubject"
private val subjectSchema = new Schema.Parser()
.parse("""{"type":"record","name":"testCC","fields":[{"name":"s","type":"string"}]}""")
.parse(
"""{"type":"record","name":"testCC","fields":[{"name":"s","type":"string"}]}"""
)
private var parser: Parser = _
private var outStream: ByteArrayOutputStream = _

Expand Down Expand Up @@ -53,7 +55,9 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter {

test("parseNothing") {
withRunningKafkaOnFoundPort(config) { implicit config =>
parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}"))
parser.setSchemaExposer(
new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")
)
assertThrows[RuntimeException] {
parser.parse(null)
}
Expand All @@ -62,7 +66,9 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter {

test("parseDefinedPlusParseEmpty") {
withRunningKafkaOnFoundPort(config) { implicit config =>
parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}"))
parser.setSchemaExposer(
new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")
)
parser.getSchemaExposer.put(subjectSchema, subjectName)

parser.parse(Array("-t", subjectName))
Expand All @@ -72,10 +78,12 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter {

test("printAllTopics") {
withRunningKafkaOnFoundPort(config) { implicit config =>
parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}"))
parser.setSchemaExposer(
new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")
)
parser.getSchemaExposer.put(subjectSchema, subjectName)

//check whether the TopicParser prints the same output after more than 1 call.
// check whether the TopicParser prints the same output after more than 1 call.
Console.withOut(outStream)(parser.printTopics())
val res = new String(outStream.toByteArray)
val otherOutStream = new java.io.ByteArrayOutputStream
Expand All @@ -88,39 +96,51 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter {
test("setKafkaAddress") {
withRunningKafkaOnFoundPort(config) { implicit config =>
val kafkaAddress = "someAddress"
val parserConfig = parser.parseConfig(("--kafka " + kafkaAddress + " --zookeeper \"notworkingAddress\"").split(" ")).get
val parserConfig = parser
.parseConfig(
("--kafka " + kafkaAddress + " --zookeeper \"notworkingAddress\"")
.split(" ")
)
.get
assert(parserConfig.kafkaAddress == kafkaAddress)
}
}

test("setZooKeeperAddress") {
withRunningKafkaOnFoundPort(config) { implicit config =>
val ZKAddress = "someOTherAddress"
val parserConfig = parser.parseConfig(("--zookeeper "+ ZKAddress).split(" ")).get
val parserConfig =
parser.parseConfig(("--zookeeper " + ZKAddress).split(" ")).get
assert(parserConfig.zookeeperAddress == ZKAddress)
}
}


test("updateSchemaFromFile") {
withRunningKafkaOnFoundPort(config) { implicit config =>

val fileName = "schema"
val zkAddress = s"localhost:${config.zooKeeperPort}"
val avroSchema = """{"type":"record","name":"Person","namespace":"org.codefeedr.plugins.repl.org.kafkaquery.parsers.Parser.updateSchema","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"city","type":"string"}]}"""
new PrintWriter(fileName) {write(avroSchema); close()}
val avroSchema =
"""{"type":"record","name":"Person","namespace":"org.codefeedr.plugins.repl.org.kafkaquery.parsers.Parser.updateSchema","fields":[{"name":"name","type":"string"},{"name":"age","type":"int"},{"name":"city","type":"string"}]}"""
new PrintWriter(fileName) { write(avroSchema); close() }

parser.parse(("--update-schema "+ subjectName +"=" + fileName+ " --zookeeper "+zkAddress).split(" "))
parser.parse(
("--update-schema " + subjectName + "=" + fileName + " --zookeeper " + zkAddress)
.split(" ")
)

assert(parser.getSchemaExposer.get(subjectName).get.toString.equals(avroSchema))
assert(
parser.getSchemaExposer.get(subjectName).get.toString.equals(avroSchema)
)

FileUtils.deleteQuietly(new File(fileName))
}
}

test("updateSchemaParserFailure") {
withRunningKafkaOnFoundPort(config) { implicit config =>
parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}"))
parser.setSchemaExposer(
new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")
)
val avroSchema = """incorrect Avro Format"""
Console.withErr(outStream) {
parser.updateSchema(subjectName, avroSchema)
Expand All @@ -133,7 +153,8 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter {

test("checkConfigQuery") {
val args: Seq[String] = Seq(
"-q", "select * from topic"
"-q",
"select * from topic"
)

val parsed = parser.parseConfig(args)
Expand All @@ -150,7 +171,9 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter {
val topic = "World"

withRunningKafkaOnFoundPort(config) { implicit config =>
parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}"))
parser.setSchemaExposer(
new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")
)
parser.getSchemaExposer.put(npmTableSchema, topic)

Console.withOut(outStream) {
Expand All @@ -166,7 +189,9 @@ class ParserTest extends AnyFunSuite with EmbeddedKafka with BeforeAndAfter {

test("testFailToPrintEmptyTopicSchema") {
withRunningKafkaOnFoundPort(config) { implicit config =>
parser.setSchemaExposer(new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}"))
parser.setSchemaExposer(
new ZookeeperSchemaExposer(s"localhost:${config.zooKeeperPort}")
)

Console.withErr(outStream) {

Expand Down
Loading