diff --git a/build.sbt b/build.sbt index ee79e1c..77c22e8 100644 --- a/build.sbt +++ b/build.sbt @@ -4,10 +4,10 @@ version := "0.1" libraryDependencies ++= Seq( "org.mongodb.scala" %% "mongo-scala-driver" % "2.9.0", - "io.monix" %% "monix" % "3.1.0", + "io.monix" %% "monix" % "3.2.0", "org.typelevel" %% "cats-core" % "2.1.1", "ch.qos.logback" % "logback-classic" % "1.2.3", "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2" ) -scalaVersion := "2.13.1" \ No newline at end of file +scalaVersion := "2.13.2" diff --git a/src/main/scala/Main.scala b/src/main/scala/Main.scala index f49c70c..a82253a 100644 --- a/src/main/scala/Main.scala +++ b/src/main/scala/Main.scala @@ -1,5 +1,5 @@ import Util.DataBuilder.Processing -import Util.Database.Database +import Util.Database import Util.File.FileHelper import Util.UserPrompt._ import cats.data.EitherT @@ -9,16 +9,17 @@ import Util.ImplicitConversions._ object Main extends TaskApp { override def run(args: List[String]): Task[ExitCode] = { - - implicit val database: Database.type = Database + val database: Database.MongoDB.type = Database.MongoDB (for { userInput <- EitherT(promptUser()) csvFiles <- EitherT(FileHelper.getCsvFiles(userInput.folderPath, userInput.skipFiles)) orderedFiles = csvFiles.zipWithIndex.toOrderedFile() - _ <- EitherT(Processing.csvFiles(orderedFiles)) + mongoClient <- EitherT.fromEither[Task](database.getMongoClient) + db <- EitherT(database.getDatabase(mongoClient)) + _ <- EitherT(Processing.csvFiles(orderedFiles)(db)) } yield { - database.close() + database.close(mongoClient) ExitCode.Success }).valueOrF(Task.raiseError) } diff --git a/src/main/scala/Util/DataBuilder/Processing.scala b/src/main/scala/Util/DataBuilder/Processing.scala index 3f45543..cdaf59d 100644 --- a/src/main/scala/Util/DataBuilder/Processing.scala +++ b/src/main/scala/Util/DataBuilder/Processing.scala @@ -1,7 +1,6 @@ package Util.DataBuilder import Util.DataBuilder.DataBuilder.buildMongoDocuments -import Util.Database.Database import Util.File.FileHelper import cats.data.EitherT import cats.implicits._ @@ -12,6 +11,7 @@ import scala.concurrent.Await import scala.concurrent.duration.Duration import Util.ErrorHandler._ import Util.Models.OrderedFile +import org.mongodb.scala.MongoDatabase object Processing { @@ -22,18 +22,17 @@ object Processing { * @return Unit */ def csvFiles(csvFiles: List[OrderedFile]) - (implicit database: Database.type): Task[Either[Exception, Unit]] = { + (implicit database: MongoDatabase): Task[Either[Exception, Unit]] = { val insertion = { - Task.wander(csvFiles) { orderedFile => + Task.parTraverse(csvFiles) { orderedFile => println(s"Processing file ${orderedFile.index + 1} of ${csvFiles.length} file name: ${orderedFile.file.name}") (for { - fileLines <- EitherT(FileHelper.extractCsvFileLines(orderedFile.file)) - headers = fileLines.headOption.map(_.split(',').toList) - lineItems = fileLines.drop(1) - collectionName = orderedFile.file.name.replace(".csv", "").toLowerCase - documentResult <- EitherT(buildMongoDocuments(headers, lineItems)) - db <- EitherT(database.getDatabase) - dbInsert <- EitherT.rightT[Task, Exception](db.getCollection[Document](collectionName).insertMany(documentResult)) + fileLines <- EitherT(FileHelper.extractCsvFileLines(orderedFile.file)) + headers = fileLines.headOption.map(_.split(',').toList) + lineItems = fileLines.drop(1) + collectionName = orderedFile.file.name.replace(".csv", "").toLowerCase + documentResult <- EitherT(buildMongoDocuments(headers, lineItems)) + dbInsert <- EitherT.rightT[Task, Exception](database.getCollection[Document](collectionName).insertMany(documentResult)) } yield { println(s"Inserting into db: $dbInsert") Await.result(dbInsert.toFuture(), Duration.Inf) diff --git a/src/main/scala/Util/Database/Database.scala b/src/main/scala/Util/Database/MongoDB.scala similarity index 72% rename from src/main/scala/Util/Database/Database.scala rename to src/main/scala/Util/Database/MongoDB.scala index 2b0559f..f1cf0cf 100644 --- a/src/main/scala/Util/Database/Database.scala +++ b/src/main/scala/Util/Database/MongoDB.scala @@ -7,9 +7,7 @@ import cats.implicits._ import scala.jdk.CollectionConverters._ -case object Database { - - private var mongoClient: Option[MongoClient] = None +case object MongoDB { /** builds mongo settings with auth * @@ -52,30 +50,34 @@ case object Database { (user, password) match { case (Some(usr), Some(pw)) => mongoSettingsBuilder(usr, pw, port, address) - case _ => mongoSettingsBuilder(address, port) + case _ => mongoSettingsBuilder(address, port) } } } + /** Gets the mongo client + * + * @return Either error or MongoClient + */ + def getMongoClient: Either[Exception, MongoClient] = { + Either.fromOption(settingsBuilder(), error("environment variables not set for db")).map { settings => + MongoClient(settings.build()) + } + } + /** Returns thee database instance * * @return */ - def getDatabase: Task[Either[Exception, MongoDatabase]] = Task.eval { - for { - databaseName <- Either.fromOption(sys.env.get("mongo_db_name"), error("environment variables not set for db name")) - settings <- Either.fromOption(settingsBuilder(), error("environment variables not set for db")) - mongoClient = MongoClient(settings.build()).some - mClient <- Either.fromOption(mongoClient, error("mongo client could not be defined")) - } yield mClient.getDatabase(databaseName) + def getDatabase(mongoClient: MongoClient): Task[Either[Exception, MongoDatabase]] = Task.eval { + Either.fromOption(sys.env.get("mongo_db_name"), error("environment variables not set for db name")).map { databaseName => + mongoClient.getDatabase(databaseName) + } } /** Closes the mongo connection * */ - def close(): Unit = mongoClient match { - case Some(db) => db.close() - case None => () - } + def close(mongoClient: MongoClient): Unit = mongoClient.close() } diff --git a/src/main/scala/Util/UserPrompt.scala b/src/main/scala/Util/UserPrompt.scala index 3db1574..ee3aa68 100644 --- a/src/main/scala/Util/UserPrompt.scala +++ b/src/main/scala/Util/UserPrompt.scala @@ -21,17 +21,13 @@ object UserPrompt { (for { directory <- EitherT.fromEither[Task](FileHelper.toDirectory(scala.io.StdIn.readLine())) skipItems <- EitherT.fromEither[Task](addSkipItems(List.empty[String])) - res <- EitherT(Task.wanderUnordered(skipItems)(FileHelper.findFile(directory, _)).map { items => - val (errors, files) = items.separate - val userInput = UserInput(directory, files) - errors.headOption.toLeft(userInput) - }) - } yield res).value + files <- EitherT(Task.parTraverse(skipItems)(FileHelper.findFile(directory, _)).map(_.sequence)) + } yield UserInput(directory, files)).value } catch { case e: Exception => - val message = s"Could not read user input: ${e.getMessage}" - errorT(message) + val message = s"Could not read user input" + errorT(message, e) } }