Skip to content

Commit

Permalink
Add Hudi connector (#257)
Browse files Browse the repository at this point in the history
* Add Hudi connector

* Fix missing save action

* Specified Hudi spark version per profile

* Update Hudi test case

* Update Spark 2.3 Hudi version

* Fix Hudi test with dedicated spark session

* Update Hudi version for Spark 2.3

* Update work version for different spark profile 3.0 & 2.4

* Add HudiConnectorConf Tests

* Add more test cases

Co-authored-by: huong.vuong <[email protected]>
  • Loading branch information
hoaihuongbk and huong.vuong authored Jun 22, 2022
1 parent 91a5780 commit a170c57
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 0 deletions.
9 changes: 9 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ export SPARK_VER=2.4
./dev/test.sh
```

Note: in some case you get the following error
```
java.net.BindException: Can't assign requested address: Service 'sparkDriver'
```
then you have to bind the spark to local ip like this
```shell
export SPARK_LOCAL_IP=127.0.0.1
```

## Styleguide

### Commit styleguide
Expand Down
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@
<maven.site.plugin.version>3.9.1</maven.site.plugin.version>
<nexus.staging.maven.plugin.version>1.6.8</nexus.staging.maven.plugin.version>
<typesafe.config.version>1.4.2</typesafe.config.version>
<hudi.version>0.11.0</hudi.version>
<hudi.bundle.version>spark3.2-bundle</hudi.bundle.version>
<spark.avro.version>3.0.2</spark.avro.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -178,6 +181,18 @@
<version>${delta.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hudi</groupId>
<artifactId>hudi-${hudi.bundle.version}_${scala.compat.version}</artifactId>
<version>${hudi.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-avro_${scala.compat.version}</artifactId>
<version>${spark.avro.version}</version>
</dependency>

<!-- TYPESAFE CONFIG -->
<dependency>
<groupId>com.typesafe</groupId>
Expand Down Expand Up @@ -393,6 +408,8 @@
<spark.dynamodb.version>1.0.4</spark.dynamodb.version>
<delta.version>0.6.1</delta.version>
<apache.hadoop.version>2.9.2</apache.hadoop.version>
<hudi.bundle.version>spark2.4-bundle</hudi.bundle.version>
<spark.avro.version>2.4.8</spark.avro.version>
</properties>
</profile>
<profile>
Expand All @@ -404,6 +421,9 @@
<spark.dynamodb.version>1.0.4</spark.dynamodb.version>
<delta.version>0.6.1</delta.version>
<apache.hadoop.version>2.9.2</apache.hadoop.version>
<hudi.bundle.version>spark-bundle</hudi.bundle.version>
<hudi.version>0.7.0</hudi.version>
<spark.avro.version>2.4.4</spark.avro.version>
</properties>
</profile>
<profile>
Expand All @@ -413,6 +433,8 @@
<spark.compat.version>3.0</spark.compat.version>
<spark.cassandra.connector.version>3.0.0</spark.cassandra.connector.version>
<delta.version>0.7.0</delta.version>
<hudi.bundle.version>spark3.0.3-bundle</hudi.bundle.version>
<hudi.version>0.10.1</hudi.version>
</properties>
</profile>
<profile>
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/github/setl/enums/Storage.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ public enum Storage {
JSON("io.github.setl.storage.connector.JSONConnector"),
JDBC("io.github.setl.storage.connector.JDBCConnector"),
STRUCTURED_STREAMING("io.github.setl.storage.connector.StructuredStreamingConnector"),
HUDI("io.github.setl.storage.connector.HudiConnector"),
OTHER(null);

private String connectorName;
Expand Down
40 changes: 40 additions & 0 deletions src/main/scala/io/github/setl/config/HudiConnectorConf.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.github.setl.config

import io.github.setl.exception.ConfException
import org.apache.spark.sql.SaveMode

class HudiConnectorConf extends ConnectorConf {

import HudiConnectorConf._

override def getReaderConf: Map[String, String] = {
import scala.collection.JavaConverters._
settings.asScala.toMap - PATH
}

override def getWriterConf: Map[String, String] = {
import scala.collection.JavaConverters._
settings.asScala.toMap - SAVEMODE - PATH
}

def setPath(path: String): this.type = set("path", path)

def setSaveMode(saveMode: String): this.type = set("saveMode", saveMode)

def setSaveMode(saveMode: SaveMode): this.type = set("saveMode", saveMode.toString)

def getPath: String = get("path") match {
case Some(path) => path
case _ => throw new ConfException("The value of path is not set")
}

def getSaveMode: SaveMode = SaveMode.valueOf(get("saveMode", SaveMode.Append.toString))

}

object HudiConnectorConf {
def fromMap(options: Map[String, String]): HudiConnectorConf = new HudiConnectorConf().set(options)

val SAVEMODE: String = "saveMode"
val PATH: String = "path"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.github.setl.storage.connector

import com.typesafe.config.Config
import io.github.setl.config.{Conf, HudiConnectorConf}
import io.github.setl.enums.Storage
import io.github.setl.internal.HasReaderWriter
import io.github.setl.util.TypesafeConfigUtils
import org.apache.spark.sql._

class HudiConnector(val options: HudiConnectorConf) extends Connector with HasReaderWriter {
override val storage: Storage = Storage.HUDI

def this(options: Map[String, String]) = this(HudiConnectorConf.fromMap(options))

def this(config: Config) = this(TypesafeConfigUtils.getMap(config))

def this(conf: Conf) = this(conf.toMap)

override val reader: DataFrameReader = {
spark.read
.format("hudi")
.options(options.getReaderConf)
}

override val writer: DataFrame => DataFrameWriter[Row] = (df: DataFrame) => {
df.write
.format("hudi")
.mode(options.getSaveMode)
.options(options.getWriterConf)
}

/**
* Read data from the data source
*
* @return a [[DataFrame]]
*/
@throws[java.io.FileNotFoundException](s"${options.getPath} doesn't exist")
@throws[org.apache.spark.sql.AnalysisException](s"${options.getPath} doesn't exist")
override def read(): DataFrame = {
logDebug(s"Reading ${storage.toString} file in: '${options.getPath}'")
this.setJobDescription(s"Read file(s) from '${options.getPath}'")
reader.load(options.getPath)
}

/**
* Write a [[DataFrame]] into the data storage
*
* @param t a [[DataFrame]] to be saved
* @param suffix for data connectors that support suffix (e.g. [[FileConnector]],
* add the given suffix to the save path
*/
override def write(t: DataFrame, suffix: Option[String]): Unit = {
if (suffix.isDefined) logWarning("Suffix is not supported in HudiConnector")
write(t)
}

/**
* Write a [[DataFrame]] into the data storage
*
* @param t a [[DataFrame]] to be saved
*/
override def write(t: DataFrame): Unit = {
this.setJobDescription(s"Write file to ${options.getPath}")
writer(t).save(options.getPath)
}
}
11 changes: 11 additions & 0 deletions src/test/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,14 @@ schemaConverter {
saveMode = "ErrorIfExists"
}
}

hudi {
test {
path = "${project.basedir}/src/test/resources/test_hudi_7"
saveMode = "Overwrite"
hoodie.table.name = "test_object_7"
hoodie.datasource.write.recordkey.field = "col1"
hoodie.datasource.write.precombine.field = "col4"
hoodie.datasource.write.table.type = "MERGE_ON_READ"
}
}
48 changes: 48 additions & 0 deletions src/test/scala/io/github/setl/config/HudiConnectorConfSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.github.setl.config

import io.github.setl.exception.ConfException
import org.scalatest.funsuite.AnyFunSuite
import org.apache.spark.sql.SaveMode

class HudiConnectorConfSuite extends AnyFunSuite {
val conf = new HudiConnectorConf

test("Get/Set HudiConnectorConf") {
assert(conf.get("saveMode") === None)
conf.setSaveMode("Append")
assert(conf.getSaveMode === SaveMode.Append)
conf.setSaveMode("Overwrite")
assert(conf.getSaveMode === SaveMode.Overwrite)
conf.setSaveMode(SaveMode.Overwrite)
assert(conf.getSaveMode === SaveMode.Overwrite)

assert(conf.get("path") === None)
assertThrows[ConfException](conf.getPath)

conf.setPath("path")
assert(conf.getPath === "path")
}

test("Init HudiConnectorConf from options") {
val options : Map[String, String] = Map(
"path" -> "path",
"saveMode" -> "Append",
"hoodie.table.name" -> "test_object",
"hoodie.datasource.write.recordkey.field" -> "col1",
"hoodie.datasource.write.precombine.field" -> "col4",
"hoodie.datasource.write.table.type" -> "MERGE_ON_READ"
)

val confFromOpts: HudiConnectorConf = HudiConnectorConf.fromMap(options)
assert(confFromOpts.getPath === "path")
assert(confFromOpts.getSaveMode === SaveMode.Append)

val readerOpts = confFromOpts.getReaderConf
val writerOpts = confFromOpts.getWriterConf

// Config should not contains path & save mode
assert(!readerOpts.contains("path"))
assert(!writerOpts.contains("path"))
assert(!writerOpts.contains("saveMode"))
}
}
2 changes: 2 additions & 0 deletions src/test/scala/io/github/setl/config/Properties.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ object Properties {

val jdbcConfig: Config = cl.getConfig("psql.test")

val hudiConfig : Config = cl.getConfig("hudi.test")

val excelConfigConnector: Config = cl.getConfig("connector.excel")
val cassandraConfigConnector: Config = cl.getConfig("connector.cassandra")
val csvConfigConnector: Config = cl.getConfig("connector.csv")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.github.setl.storage.connector

import io.github.setl.config.{Conf, HudiConnectorConf, Properties}
import io.github.setl.{SparkSessionBuilder, SparkTestUtils, TestObject2}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.scalatest.funsuite.AnyFunSuite

import java.nio.file.Paths
import java.sql.{Date, Timestamp}

class HudiConnectorSuite extends AnyFunSuite {

val path: String = Paths.get("src", "test", "resources", "test_hudi").toFile.getAbsolutePath
val saveMode = SaveMode.Overwrite

val options: Map[String, String] = Map[String, String](
"path" -> path,
"saveMode" -> saveMode.toString,
"hoodie.table.name" -> "test_object",
"hoodie.datasource.write.recordkey.field" -> "col1",
"hoodie.datasource.write.precombine.field" -> "col4",
"hoodie.datasource.write.table.type" -> "MERGE_ON_READ"
)

val testTable: Seq[TestObject2] = Seq(
TestObject2("string", 5, 0.000000001685400132103450D, new Timestamp(1557153268000L), new Date(1557100800000L), 999999999999999999L),
TestObject2("string2", 5, 0.000000001685400132103450D, new Timestamp(1557153268000L), new Date(1557100800000L), 999999999999999999L),
TestObject2("string3", 5, 0.000000001685400132103450D, new Timestamp(1557153268000L), new Date(1557100800000L), 999999999999999999L)
)

test("Instantiation of constructors") {

// New spark session here since Hudi only supports KryoSerializer
val spark: SparkSession = new SparkSessionBuilder().setEnv("local")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.build()
.get()
assume(SparkTestUtils.checkSparkVersion("2.4"))

import spark.implicits._

val connector = new HudiConnector(HudiConnectorConf.fromMap(options))
connector.write(testTable.toDF)
assert(connector.read().collect().length == testTable.length)

val path2: String = Paths.get("src", "test", "resources", "test_hudi_2").toFile.getAbsolutePath
val options2 = options + ("path" -> path2)
val connector2 = new HudiConnector(options2)
connector2.write(testTable.toDF)
assert(connector2.read().collect().length == testTable.length)

val path3: String = Paths.get("src", "test", "resources", "test_hudi_3").toFile.getAbsolutePath
val options3 = options + ("path" -> path3)
val connector3 = new HudiConnector(Conf.fromMap(options3))
connector3.write(testTable.toDF, Some("any_"))
assert(connector3.read().collect().length == testTable.length)

val connector7 = new HudiConnector(Properties.hudiConfig)
connector7.write(testTable.toDF)
assert(connector7.read().collect().length == testTable.length)
}
}

0 comments on commit a170c57

Please sign in to comment.