Skip to content

Commit 2da0cb3

Browse files
lorenzograzianoandrea-rockt
authored andcommitted
Adding flavour for spark 3 and emr613
1 parent d1ad59d commit 2da0cb3

File tree

83 files changed

+2560
-401
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

83 files changed

+2560
-401
lines changed

.gitlab-ci.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ test-kernel:
3030
image: "registry.gitlab.com/agilefactory/agile.wasp2/sbt:1.9.8-8u402-b06-jdk-jammy"
3131
parallel:
3232
matrix:
33-
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719"]
33+
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719", "EMR_6_13"]
3434
script:
3535
- ./run-sbt-unprivileged.sh "clean coverageOn wasp-kernel/test coverageOff wasp-kernel/coverageReport wasp-kernel/coverageAggregate"
3636
after_script:
@@ -56,7 +56,7 @@ test-plugin:
5656
image: "registry.gitlab.com/agilefactory/agile.wasp2/sbt:1.9.8-8u402-b06-jdk-jammy"
5757
parallel:
5858
matrix:
59-
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719"]
59+
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719", "EMR_6_13"]
6060
script:
6161
- ./run-sbt-unprivileged.sh "clean coverageOn wasp-plugin/test coverageOff wasp-plugin/coverageReport wasp-plugin/coverageAggregate"
6262
after_script:
@@ -83,7 +83,7 @@ test-repo:
8383
image: "registry.gitlab.com/agilefactory/agile.wasp2/sbt:1.9.8-8u402-b06-jdk-jammy"
8484
parallel:
8585
matrix:
86-
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719"]
86+
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719", "EMR_6_13"]
8787
script:
8888
- ./run-sbt-unprivileged.sh "clean coverageOn wasp-repository/test coverageOff wasp-repository/coverageReport wasp-repository/coverageAggregate"
8989
after_script:
@@ -109,7 +109,7 @@ compile-whitelabel:
109109
image: "registry.gitlab.com/agilefactory/agile.wasp2/sbt:1.9.8-8u402-b06-jdk-jammy"
110110
parallel:
111111
matrix:
112-
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719"]
112+
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719", "EMR_6_13"]
113113
script:
114114
- ./run-sbt-unprivileged.sh "clean wasp-whitelabel/test"
115115
tags:
@@ -170,7 +170,7 @@ snapshot:
170170
image: "registry.gitlab.com/agilefactory/agile.wasp2/sbt:1.9.8-8u402-b06-jdk-jammy"
171171
parallel:
172172
matrix:
173-
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719"]
173+
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719", "EMR_6_13"]
174174
tags:
175175
- gitlab-org
176176
script:
@@ -194,7 +194,7 @@ release:
194194
image: "registry.gitlab.com/agilefactory/agile.wasp2/sbt:1.9.8-8u402-b06-jdk-jammy"
195195
parallel:
196196
matrix:
197-
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719"]
197+
- WASP_FLAVOR: [ "VANILLA2", "VANILLA2_2_12", "EMR_2_12", "CDP719", "EMR_6_13"]
198198
script:
199199
- "mkdir -p $HOME/.sbt/gpg"
200200
- "echo $GPG_PUB | base64 -d > $HOME/.sbt/gpg/pubring.asc"

build.sbt

Lines changed: 129 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import org.checkerframework.checker.units.qual.s
22
import Flavor.EMR212
3+
import Flavor.EMR613
34

45
lazy val flavor = {
56
val f = Flavor.currentFlavor()
@@ -35,6 +36,12 @@ lazy val model = Project("wasp-model", file("model"))
3536
.settings(libraryDependencies ++= dependencies.modelDependencies)
3637

3738
lazy val core = Project("wasp-core", file("core"))
39+
.settings(
40+
Compile / unmanagedSourceDirectories += sourceDirectory.value / "main"
41+
/ s"java${if (flavor == EMR613) "-emr613" else "-legacy"}",
42+
Compile / unmanagedSourceDirectories += sourceDirectory.value / "main"
43+
/ s"scala${if (flavor == EMR613) "-emr613" else "-legacy"}",
44+
)
3845
.settings(settings.commonSettings: _*)
3946
.dependsOn(scala_compiler)
4047
.dependsOn(model)
@@ -77,6 +84,10 @@ lazy val producers = Project("wasp-producers", file("producers"))
7784
.settings(libraryDependencies ++= dependencies.producersDependencies)
7885

7986
lazy val consumers_spark = Project("wasp-consumers-spark", file("consumers-spark"))
87+
.settings(
88+
Compile / unmanagedSourceDirectories += sourceDirectory.value / "main"
89+
/ s"scala${if (flavor == EMR613) "-emr613" else "-legacy"}"
90+
)
8091
.settings(settings.commonSettings: _*)
8192
.dependsOn(core)
8293
.settings(libraryDependencies ++= dependencies.consumersSparkDependencies)
@@ -112,14 +123,20 @@ lazy val plugin_jdbc_spark = Project("wasp-plugin-jdbc-spark", file("plugin-jdbc
112123

113124
lazy val plugin_kafka_spark = Project("wasp-plugin-kafka-spark", file("plugin-kafka-spark"))
114125
.settings(settings.commonSettings: _*)
126+
.settings(
127+
Test / unmanagedSourceDirectories += sourceDirectory.value / "test"
128+
/ s"scala${if (flavor == EMR613) "-emr613" else "-legacy"}",
129+
)
115130
.dependsOn(consumers_spark % "compile->compile;test->test")
116131
.settings(libraryDependencies ++= dependencies.pluginKafkaSparkDependencies)
117132

118133
lazy val plugin_kafka_spark_old = Project("wasp-plugin-kafka-spark-old", file("plugin-kafka-spark-old"))
119134
.settings(settings.commonSettings: _*)
120135
.settings(
121136
Compile / scalaSource := baseDirectory.value / ".." / "plugin-kafka-spark" / "src" / "main" / "scala",
122-
Test / scalaSource := baseDirectory.value / ".." / "plugin-kafka-spark" / "src" / "test" / "scala"
137+
Test / scalaSource := baseDirectory.value / ".." / "plugin-kafka-spark" / "src" / "test" / "scala",
138+
Test / unmanagedSourceDirectories += baseDirectory.value / ".." / "plugin-kafka-spark" / "src" / "test"
139+
/ s"scala${if (flavor == EMR613) "-emr613" else "-legacy"}"
123140
)
124141
.dependsOn(consumers_spark % "compile->compile;test->test")
125142
.settings(libraryDependencies ++= dependencies.pluginKafkaSparkOldDependencies)
@@ -145,6 +162,10 @@ lazy val plugin_mailer_spark = Project("wasp-plugin-mailer-spark", file("plugin-
145162
.settings(libraryDependencies ++= dependencies.pluginMailerSparkDependencies)
146163

147164
lazy val plugin_http_spark = Project("wasp-plugin-http-spark", file("plugin-http-spark"))
165+
.settings(
166+
Test / unmanagedSourceDirectories += sourceDirectory.value / "test"
167+
/ s"scala${if (flavor == EMR613) "-emr613" else "-legacy"}"
168+
)
148169
.settings(settings.commonSettings: _*)
149170
.dependsOn(consumers_spark % "compile->compile;test->test")
150171
.settings(libraryDependencies ++= dependencies.pluginHttpSparkDependencies)
@@ -160,6 +181,10 @@ lazy val microservice_catalog = Project("wasp-microservice-catalog", file("micro
160181
.dependsOn(consumers_spark % "compile->compile;test->test")
161182

162183
lazy val plugin_parallel_write_spark = Project("wasp-plugin-parallel-write-spark", file("plugin-parallel-write-spark"))
184+
.settings(
185+
Test / unmanagedSourceDirectories += sourceDirectory.value / "test"
186+
/ s"scala${if (flavor == EMR613) "-emr613" else "-legacy"}"
187+
)
163188
.settings(Defaults.itSettings)
164189
.settings(settings.commonSettings: _*)
165190
.settings(settings.disableParallelTests)
@@ -169,14 +194,24 @@ lazy val plugin_parallel_write_spark = Project("wasp-plugin-parallel-write-spark
169194

170195
/* Yarn */
171196

172-
lazy val yarn_auth_hdfs = Project("wasp-yarn-auth-hdfs", file("yarn/auth/hdfs"))
173-
.settings(settings.commonSettings: _*)
174-
.settings(dependencies.kmsTest: _*)
175-
.settings(libraryDependencies ++= dependencies.yarnAuthHdfsDependencies)
176-
177-
lazy val yarn_auth_hbase = Project("wasp-yarn-auth-hbase", file("yarn/auth/hbase"))
178-
.settings(settings.commonSettings: _*)
179-
.settings(libraryDependencies ++= dependencies.yarnAuthHBaseDependencies)
197+
lazy val yarn_auth_hdfs =
198+
Project("wasp-yarn-auth-hdfs", file("yarn/auth/hdfs"))
199+
.settings(
200+
Compile / unmanagedSourceDirectories += sourceDirectory.value / "main"
201+
/ s"scala${if (flavor == EMR613) "-emr613" else "-legacy"}",
202+
)
203+
.settings(settings.commonSettings: _*)
204+
.settings(dependencies.kmsTest: _*)
205+
.settings(libraryDependencies ++= dependencies.yarnAuthHdfsDependencies)
206+
207+
lazy val yarn_auth_hbase =
208+
Project("wasp-yarn-auth-hbase", file("yarn/auth/hbase"))
209+
.settings(
210+
Compile / unmanagedSourceDirectories += sourceDirectory.value / "main"
211+
/ s"scala${if (flavor == EMR613) "-emr613" else "-legacy"}"
212+
)
213+
.settings(settings.commonSettings: _*)
214+
.settings(libraryDependencies ++= dependencies.yarnAuthHBaseDependencies)
180215

181216
lazy val yarn_auth = Project("wasp-yarn-auth", file("yarn/auth"))
182217
.settings(settings.commonSettings: _*)
@@ -187,10 +222,18 @@ lazy val yarn = Project("wasp-yarn", file("yarn"))
187222
.aggregate(yarn_auth)
188223

189224
lazy val spark_telemetry_plugin = Project("wasp-spark-telemetry-plugin", file("spark/telemetry-plugin"))
225+
.settings(
226+
Compile / unmanagedSourceDirectories += sourceDirectory.value / "main"
227+
/ s"scala${if (flavor == EMR613) "-emr613" else "-legacy"}"
228+
)
190229
.settings(settings.commonSettings: _*)
191230
.settings(libraryDependencies ++= dependencies.sparkTelemetryPluginDependencies)
192231

193232
lazy val spark_nifi_plugin = Project("wasp-spark-nifi-plugin", file("spark/nifi-plugin"))
233+
.settings(
234+
Compile / unmanagedSourceDirectories += sourceDirectory.value / "main"
235+
/ s"scala${if (flavor == EMR613) "-emr613" else "-legacy"}"
236+
)
194237
.settings(settings.commonSettings: _*)
195238
.settings(libraryDependencies ++= dependencies.sparkNifiPluginDependencies)
196239
.dependsOn(consumers_spark)
@@ -230,26 +273,40 @@ lazy val kernel = project
230273
aws
231274
)
232275

233-
lazy val plugin = project
234-
.withId("wasp-plugin")
235-
.settings(settings.commonSettings: _*)
236-
.aggregate(
237-
plugin_console_spark,
238-
plugin_hbase_spark,
239-
plugin_plain_hbase_writer_spark,
240-
plugin_jdbc_spark,
241-
plugin_kafka_spark,
242-
plugin_kafka_spark_old,
243-
plugin_raw_spark,
244-
plugin_solr_spark,
245-
plugin_cdc_spark,
246-
plugin_parallel_write_spark,
247-
plugin_mailer_spark,
248-
plugin_http_spark,
249-
plugin_mongo_spark,
250-
microservice_catalog,
251-
plugin_elastic_spark
252-
)
276+
lazy val plugin =
277+
if (flavor != EMR613) {
278+
project
279+
.withId("wasp-plugin")
280+
.settings(settings.commonSettings: _*)
281+
.aggregate(
282+
plugin_console_spark,
283+
plugin_hbase_spark,
284+
plugin_plain_hbase_writer_spark,
285+
plugin_jdbc_spark,
286+
plugin_kafka_spark,
287+
plugin_kafka_spark_old,
288+
plugin_raw_spark,
289+
plugin_solr_spark,
290+
plugin_cdc_spark,
291+
plugin_parallel_write_spark,
292+
plugin_mailer_spark,
293+
plugin_http_spark,
294+
plugin_mongo_spark,
295+
microservice_catalog,
296+
plugin_elastic_spark
297+
)
298+
}
299+
else{
300+
project
301+
.withId("wasp-plugin")
302+
.settings(settings.commonSettings: _*)
303+
.aggregate(
304+
plugin_parallel_write_spark,
305+
plugin_http_spark,
306+
plugin_kafka_spark
307+
)
308+
309+
}
253310

254311
/* Framework + Plugins */
255312
lazy val wasp = Project("wasp", file("."))
@@ -287,28 +344,48 @@ lazy val whiteLabelProducers = Project("wasp-whitelabel-producers", file("whitel
287344
.settings(dependencies.whitelabelProducerScriptClasspath)
288345
.enablePlugins(JavaAppPackaging)
289346

290-
lazy val whiteLabelConsumersSpark = Project("wasp-whitelabel-consumers-spark", file("whitelabel/consumers-spark"))
291-
.settings(settings.commonSettings: _*)
292-
.dependsOn(whiteLabelModels)
293-
.dependsOn(consumers_spark)
294-
.dependsOn(repository_mongo)
295-
.dependsOn(plugin_console_spark)
296-
.dependsOn(plugin_hbase_spark)
297-
.dependsOn(plugin_jdbc_spark)
298-
.dependsOn(plugin_kafka_spark)
299-
.dependsOn(plugin_mailer_spark)
300-
.dependsOn(plugin_raw_spark)
301-
.dependsOn(plugin_solr_spark)
302-
.dependsOn(plugin_mongo_spark)
303-
.dependsOn(plugin_http_spark)
304-
.dependsOn(plugin_cdc_spark)
305-
.dependsOn(spark_telemetry_plugin)
306-
.dependsOn(spark_nifi_plugin)
307-
.dependsOn(plugin_parallel_write_spark)
308-
.dependsOn(aws_auth_temporary_credentials)
309-
.settings(libraryDependencies ++= dependencies.whitelabelSparkConsumerDependencies)
310-
.enablePlugins(JavaAppPackaging)
311-
.settings(dependencies.whitelabelSparkConsumerScriptClasspath)
347+
lazy val whiteLabelConsumersSpark =
348+
if (flavor != EMR613) {
349+
Project("wasp-whitelabel-consumers-spark", file("whitelabel/consumers-spark"))
350+
.settings(settings.commonSettings: _*)
351+
.dependsOn(whiteLabelModels)
352+
.dependsOn(consumers_spark)
353+
.dependsOn(repository_mongo)
354+
.dependsOn(plugin_console_spark)
355+
.dependsOn(plugin_hbase_spark)
356+
.dependsOn(plugin_jdbc_spark)
357+
.dependsOn(plugin_kafka_spark)
358+
.dependsOn(plugin_mailer_spark)
359+
.dependsOn(plugin_raw_spark)
360+
.dependsOn(plugin_solr_spark)
361+
.dependsOn(plugin_mongo_spark)
362+
.dependsOn(plugin_http_spark)
363+
.dependsOn(plugin_cdc_spark)
364+
.dependsOn(spark_telemetry_plugin)
365+
.dependsOn(spark_nifi_plugin)
366+
.dependsOn(plugin_parallel_write_spark)
367+
.dependsOn(aws_auth_temporary_credentials)
368+
.settings(libraryDependencies ++= dependencies.whitelabelSparkConsumerDependencies)
369+
.enablePlugins(JavaAppPackaging)
370+
.settings(dependencies.whitelabelSparkConsumerScriptClasspath)
371+
} else {
372+
Project("wasp-whitelabel-consumers-spark", file("whitelabel/consumers-spark"))
373+
.settings(settings.commonSettings: _*)
374+
.settings(
375+
Test / unmanagedSourceDirectories += sourceDirectory.value / "test"
376+
/ s"scala${if (flavor == EMR613) "-emr613" else "-legacy"}",
377+
)
378+
.dependsOn(whiteLabelModels)
379+
.dependsOn(consumers_spark)
380+
.dependsOn(repository_mongo)
381+
.dependsOn(plugin_http_spark)
382+
.dependsOn(plugin_parallel_write_spark)
383+
.dependsOn(plugin_kafka_spark)
384+
.dependsOn(aws_auth_temporary_credentials)
385+
.settings(libraryDependencies ++= dependencies.whitelabelSparkConsumerDependencies)
386+
.enablePlugins(JavaAppPackaging)
387+
.settings(dependencies.whitelabelSparkConsumerScriptClasspath)
388+
}
312389

313390
lazy val whiteLabelSingleNode = project
314391
.withId("wasp-whitelabel-singlenode")
@@ -348,7 +425,7 @@ lazy val aws_auth_temporary_credentials =
348425
.settings(settings.commonSettings: _*)
349426
.settings(libraryDependencies ++= dependencies.scalaTestDependencies)
350427
.settings(libraryDependencies ++= dependencies.awsAuth)
351-
.settings(Test / skip := flavor != EMR212) //only test this in EMR212 build
428+
.settings(Test / skip := !(flavor == EMR212 || flavor == EMR613)) //only test this in EMR212 build
352429

353430
lazy val aws_auth = Project("wasp-aws-auth", file("aws/auth"))
354431
.settings(settings.commonSettings: _*)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package it.agilelab.bigdata.wasp.consumer.spark.streaming.actor.telemetry
2+
3+
import it.agilelab.bigdata.wasp.consumers.spark.streaming.actor.telemetry.TelemetryActor
4+
import it.agilelab.bigdata.wasp.core.messages.TelemetryMessageJsonProtocol._
5+
import it.agilelab.bigdata.wasp.core.messages.TelemetryMessageSourcesSummary
6+
import org.json4s.native.JsonMethods._
7+
import org.json4s.{DefaultFormats, Extraction}
8+
import spray.json._
9+
10+
11+
12+
trait CompatibilityTelemetryActor {
13+
self: TelemetryActor =>
14+
@com.github.ghik.silencer.silent("deprecated")
15+
protected def toMessage(message: Any): String = {
16+
message match {
17+
case data: Map[_, _] =>
18+
implicit val formats: DefaultFormats.type = DefaultFormats
19+
compact(render(Extraction.decompose(data.asInstanceOf[Map[String, Any]])))
20+
case data: TelemetryMessageSourcesSummary => data.toJson.toString()
21+
}
22+
}
23+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package it.agilelab.bigdata.wasp.consumers.spark.utils
2+
3+
import org.apache.spark.sql.catalyst.expressions.Expression
4+
5+
trait CompatibilityAvroDeserializerExpression {
6+
self: AvroDeserializerExpression =>
7+
override protected def withNewChildInternal(newChild: Expression): Expression = {
8+
copy(child = newChild)
9+
}
10+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package it.agilelab.bigdata.wasp.consumers.spark.utils
2+
3+
import org.apache.spark.sql.catalyst.expressions.Expression
4+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
5+
6+
trait CompatibilityAvroSerializerExpression {
7+
self: AvroSerializerExpression =>
8+
override protected def withNewChildInternal(newChild: Expression): Expression = {
9+
copy(child = newChild)
10+
}
11+
12+
def serializeTimestamp =
13+
(item: Any) =>
14+
DateTimeUtils.toJavaTimestamp(item.asInstanceOf[Long]).getTime
15+
def serializeDateType =
16+
(item: Any) =>
17+
if (item == null) null
18+
else {
19+
println("---item")
20+
println(item)
21+
println(item.getClass.getName)
22+
DateTimeUtils.toJavaDate(item.asInstanceOf[Int]).getTime
23+
}
24+
25+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package it.agilelab.bigdata.wasp.consumers.spark.utils
2+
3+
import org.apache.spark.sql.catalyst.expressions.Expression
4+
5+
trait CompatibilityCompressExpression {
6+
self: CompressExpression =>
7+
override protected def withNewChildInternal(newChild: Expression): Expression = {
8+
copy(child = newChild)
9+
}
10+
}

0 commit comments

Comments
 (0)