Skip to content

Commit

Permalink
Merge pull request #18 from jpbnetley/feature/library_updates
Browse files Browse the repository at this point in the history
Feature/library updates
  • Loading branch information
jpbnetley authored Apr 27, 2020
2 parents 7aebd98 + 4b4793e commit d828f50
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 40 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ version := "0.1"

libraryDependencies ++= Seq(
"org.mongodb.scala" %% "mongo-scala-driver" % "2.9.0",
"io.monix" %% "monix" % "3.1.0",
"io.monix" %% "monix" % "3.2.0",
"org.typelevel" %% "cats-core" % "2.1.1",
"ch.qos.logback" % "logback-classic" % "1.2.3",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.2"
)

scalaVersion := "2.13.1"
scalaVersion := "2.13.2"
11 changes: 6 additions & 5 deletions src/main/scala/Main.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import Util.DataBuilder.Processing
import Util.Database.Database
import Util.Database
import Util.File.FileHelper
import Util.UserPrompt._
import cats.data.EitherT
Expand All @@ -9,16 +9,17 @@ import Util.ImplicitConversions._

object Main extends TaskApp {
override def run(args: List[String]): Task[ExitCode] = {

implicit val database: Database.type = Database
val database: Database.MongoDB.type = Database.MongoDB

(for {
userInput <- EitherT(promptUser())
csvFiles <- EitherT(FileHelper.getCsvFiles(userInput.folderPath, userInput.skipFiles))
orderedFiles = csvFiles.zipWithIndex.toOrderedFile()
_ <- EitherT(Processing.csvFiles(orderedFiles))
mongoClient <- EitherT.fromEither[Task](database.getMongoClient)
db <- EitherT(database.getDatabase(mongoClient))
_ <- EitherT(Processing.csvFiles(orderedFiles)(db))
} yield {
database.close()
database.close(mongoClient)
ExitCode.Success
}).valueOrF(Task.raiseError)
}
Expand Down
19 changes: 9 additions & 10 deletions src/main/scala/Util/DataBuilder/Processing.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package Util.DataBuilder

import Util.DataBuilder.DataBuilder.buildMongoDocuments
import Util.Database.Database
import Util.File.FileHelper
import cats.data.EitherT
import cats.implicits._
Expand All @@ -12,6 +11,7 @@ import scala.concurrent.Await
import scala.concurrent.duration.Duration
import Util.ErrorHandler._
import Util.Models.OrderedFile
import org.mongodb.scala.MongoDatabase

object Processing {

Expand All @@ -22,18 +22,17 @@ object Processing {
* @return Unit
*/
def csvFiles(csvFiles: List[OrderedFile])
(implicit database: Database.type): Task[Either[Exception, Unit]] = {
(implicit database: MongoDatabase): Task[Either[Exception, Unit]] = {
val insertion = {
Task.wander(csvFiles) { orderedFile =>
Task.parTraverse(csvFiles) { orderedFile =>
println(s"Processing file ${orderedFile.index + 1} of ${csvFiles.length} file name: ${orderedFile.file.name}")
(for {
fileLines <- EitherT(FileHelper.extractCsvFileLines(orderedFile.file))
headers = fileLines.headOption.map(_.split(',').toList)
lineItems = fileLines.drop(1)
collectionName = orderedFile.file.name.replace(".csv", "").toLowerCase
documentResult <- EitherT(buildMongoDocuments(headers, lineItems))
db <- EitherT(database.getDatabase)
dbInsert <- EitherT.rightT[Task, Exception](db.getCollection[Document](collectionName).insertMany(documentResult))
fileLines <- EitherT(FileHelper.extractCsvFileLines(orderedFile.file))
headers = fileLines.headOption.map(_.split(',').toList)
lineItems = fileLines.drop(1)
collectionName = orderedFile.file.name.replace(".csv", "").toLowerCase
documentResult <- EitherT(buildMongoDocuments(headers, lineItems))
dbInsert <- EitherT.rightT[Task, Exception](database.getCollection[Document](collectionName).insertMany(documentResult))
} yield {
println(s"Inserting into db: $dbInsert")
Await.result(dbInsert.toFuture(), Duration.Inf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@ import cats.implicits._

import scala.jdk.CollectionConverters._

case object Database {

private var mongoClient: Option[MongoClient] = None
case object MongoDB {

/** builds mongo settings with auth
*
Expand Down Expand Up @@ -52,30 +50,34 @@ case object Database {

(user, password) match {
case (Some(usr), Some(pw)) => mongoSettingsBuilder(usr, pw, port, address)
case _ => mongoSettingsBuilder(address, port)
case _ => mongoSettingsBuilder(address, port)
}
}
}

/** Gets the mongo client
*
* @return Either error or MongoClient
*/
def getMongoClient: Either[Exception, MongoClient] = {
Either.fromOption(settingsBuilder(), error("environment variables not set for db")).map { settings =>
MongoClient(settings.build())
}
}

/** Returns thee database instance
*
* @return
*/
def getDatabase: Task[Either[Exception, MongoDatabase]] = Task.eval {
for {
databaseName <- Either.fromOption(sys.env.get("mongo_db_name"), error("environment variables not set for db name"))
settings <- Either.fromOption(settingsBuilder(), error("environment variables not set for db"))
mongoClient = MongoClient(settings.build()).some
mClient <- Either.fromOption(mongoClient, error("mongo client could not be defined"))
} yield mClient.getDatabase(databaseName)
def getDatabase(mongoClient: MongoClient): Task[Either[Exception, MongoDatabase]] = Task.eval {
Either.fromOption(sys.env.get("mongo_db_name"), error("environment variables not set for db name")).map { databaseName =>
mongoClient.getDatabase(databaseName)
}
}

/** Closes the mongo connection
*
*/
def close(): Unit = mongoClient match {
case Some(db) => db.close()
case None => ()
}
def close(mongoClient: MongoClient): Unit = mongoClient.close()

}
12 changes: 4 additions & 8 deletions src/main/scala/Util/UserPrompt.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,13 @@ object UserPrompt {
(for {
directory <- EitherT.fromEither[Task](FileHelper.toDirectory(scala.io.StdIn.readLine()))
skipItems <- EitherT.fromEither[Task](addSkipItems(List.empty[String]))
res <- EitherT(Task.wanderUnordered(skipItems)(FileHelper.findFile(directory, _)).map { items =>
val (errors, files) = items.separate
val userInput = UserInput(directory, files)
errors.headOption.toLeft(userInput)
})
} yield res).value
files <- EitherT(Task.parTraverse(skipItems)(FileHelper.findFile(directory, _)).map(_.sequence))
} yield UserInput(directory, files)).value

} catch {
case e: Exception =>
val message = s"Could not read user input: ${e.getMessage}"
errorT(message)
val message = s"Could not read user input"
errorT(message, e)
}
}

Expand Down

0 comments on commit d828f50

Please sign in to comment.