Skip to content

Commit 584cb74

Browse files
replsanjuveapen
authored andcommitted
Added BookRecommender
1 parent f089a97 commit 584cb74

File tree

5 files changed

+202
-25
lines changed

5 files changed

+202
-25
lines changed

build.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "2.4.3"
1717

1818
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.3"
1919

20+
libraryDependencies += "org.apache.spark" %% "spark-mllib" % "2.4.3"
21+
2022
libraryDependencies += "org.apache.kafka" %% "kafka" % "2.2.0"
2123

2224
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.4.0"

project/plugins.sbt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
logLevel := Level.Warn
2+
3+
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "5.2.4")

src/main/scala/org/repl/kafkasparkmongo/BXBookUserRatingsLoader.scala

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
11
package org.repl.kafkasparkmongo
22

3-
import java.util.{Arrays, Properties}
3+
import java.util.{ Arrays, Properties }
44

55
import com.mongodb.spark._
6-
import com.mongodb.spark.config.{ReadConfig, WriteConfig}
6+
import com.mongodb.spark.config.{ ReadConfig, WriteConfig }
77
import com.mongodb.spark.sql._
88
import com.mongodb.spark.sql.fieldTypes.ObjectId
99
import org.apache.spark.sql.SparkSession
10-
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
11-
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
12-
import org.apache.spark.streaming.{Seconds, StreamingContext}
13-
import org.apache.spark.{SparkConf, SparkContext}
14-
import org.repl.kafkasparkmongo.util.{SimpleKafkaClient, SparkKafkaSink}
15-
import org.apache.spark.sql.functions.{col, concat, lit, lower, split, substring, typedLit, udf}
10+
import org.apache.spark.sql.types.{ IntegerType, StringType, StructField, StructType }
11+
import org.apache.spark.streaming.kafka010.{ ConsumerStrategies, KafkaUtils, LocationStrategies }
12+
import org.apache.spark.streaming.{ Seconds, StreamingContext }
13+
import org.apache.spark.{ SparkConf, SparkContext }
14+
import org.repl.kafkasparkmongo.util.{ SimpleKafkaClient, SparkKafkaSink }
15+
import org.apache.spark.sql.functions.{ col, concat, lit, lower, split, substring, typedLit, udf }
1616
import org.mindrot.jbcrypt.BCrypt
1717

1818
import scala.util.parsing.json.JSONObject
@@ -35,9 +35,7 @@ object BXBookUserRatingsLoader {
3535
LocationStrategies.PreferConsistent,
3636
ConsumerStrategies.Subscribe[String, String](
3737
Arrays.asList(topic),
38-
props.asInstanceOf[java.util.Map[String, Object]]
39-
)
40-
)
38+
props.asInstanceOf[java.util.Map[String, Object]]))
4139

4240
val writeConfig = WriteConfig(Map("uri" -> "mongodb://lms:[email protected]/lms_db.UserBookRating"))
4341

@@ -50,7 +48,7 @@ object BXBookUserRatingsLoader {
5048
// the number of partitions of the topic (which also happens to be four.)
5149
println("*** " + r.getNumPartitions + " partitions")
5250
r.glom().foreach(a => println("*** partition size = " + a.size))
53-
val toObjectId = udf[ObjectId,String](new ObjectId(_))
51+
val toObjectId = udf[ObjectId, String](new ObjectId(_))
5452
val df = spark.read.json(r).withColumn("uid", toObjectId(col("userId")))
5553
df.printSchema()
5654
//r.foreach(s => println(s))
@@ -94,12 +92,12 @@ object BXBookUserRatingsLoader {
9492
}
9593

9694
/**
97-
* Publish some data to a topic. Encapsulated here to ensure serializable.
98-
*
99-
* @param sc
100-
* @param topic
101-
* @param config
102-
*/
95+
* Publish some data to a topic. Encapsulated here to ensure serializable.
96+
*
97+
* @param sc
98+
* @param topic
99+
* @param config
100+
*/
103101
def send(sc: SparkContext, topic: String, config: Properties): Unit = {
104102
val spark = SparkSession.builder.config(sc.getConf).getOrCreate()
105103

@@ -113,8 +111,7 @@ object BXBookUserRatingsLoader {
113111
val mySchema = StructType(Array(
114112
StructField("usernum", StringType),
115113
StructField("ISBN", StringType),
116-
StructField("rating", StringType)
117-
))
114+
StructField("rating", StringType)))
118115
val dataFrame = spark.sqlContext
119116
.read
120117
.format("csv")
@@ -138,8 +135,7 @@ object BXBookUserRatingsLoader {
138135
col("firstname"),
139136
col("lastname"),
140137
col("ISBN"),
141-
col("ratingNum").as("rating")
142-
)
138+
col("ratingNum").as("rating"))
143139
println("JoinedDF schema")
144140
joinedDF.printSchema()
145141
println("BookRatings count in joined dataframe: " + joinedDF.count())
@@ -159,9 +155,9 @@ object BXBookUserRatingsLoader {
159155
kafkaSink.value.send(topic, userId, JSONObject(mutableRowMap.toMap).toString())
160156
} catch {
161157
case npe: NullPointerException => {
162-
println("Got NPE for rowMap " + rowMap)
158+
println("Got NPE for rowMap " + rowMap)
163159
}
164-
case e : Throwable => {
160+
case e: Throwable => {
165161
println(e)
166162
}
167163
}
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package org.repl.kafkasparkmongo.samples
2+
3+
import com.mongodb.spark._
4+
import com.mongodb.spark.config.{ReadConfig, WriteConfig}
5+
import com.mongodb.spark.sql._
6+
import com.mongodb.spark.sql.fieldTypes.ObjectId
7+
import org.apache.spark.{SparkConf, SparkContext}
8+
import org.apache.spark.sql.{Row, SaveMode, SparkSession, functions}
9+
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
10+
import org.apache.spark.sql.functions.{col, concat, desc, lit, lower, split, substring, typedLit, udf}
11+
import org.apache.spark.streaming.{Seconds, StreamingContext}
12+
import org.apache.spark.ml.evaluation.RegressionEvaluator
13+
import org.apache.spark.ml.feature.StringIndexer
14+
import org.apache.spark.ml.recommendation.ALS
15+
import org.apache.spark.rdd.RDD
16+
import org.bson.Document
17+
18+
object BookRecommendation {
19+
20+
case class students_cc(id: Int,
21+
year_graduated: String,
22+
name: String)
23+
24+
def main(args: Array[String]): Unit = {
25+
//Start the Spark context
26+
val conf = new SparkConf().setAppName("SimpleStreamingFromRDD").setMaster("local[4]")
27+
val sc = new SparkContext(conf)
28+
val ssc = new StreamingContext(sc, Seconds(1))
29+
30+
val spark = SparkSession.builder.config(sc.getConf).getOrCreate()
31+
32+
//Ways to read mongodb
33+
//val usersDF = spark.read.mongo(ReadConfig(Map("uri" -> "mongodb://lms:[email protected]/lms_db.User")))
34+
//println("Users count: " + usersDF.count())
35+
//println("usersDF schema")
36+
//usersDF.printSchema()
37+
//val df = spark.sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("spark.mongodb.input.uri", "mongodb://lms:[email protected]/lms_db.User").load()
38+
//usersDF.printSchema()
39+
40+
//Another way to read data from MongoDB
41+
val bookDF = MongoSpark.load(spark.sparkContext, ReadConfig(Map("uri" -> "mongodb://lms:[email protected]/lms_db.Book"))).toDF()
42+
.drop("Image-URL-L", "Image-URL-M", "Image-URL-S")
43+
//uDF.show(false)
44+
//uDF.printSchema()
45+
//val aggregatedRdd = uDF.withPipeline(Seq(Document.parse("{ '$match': { 'Book-Title' : {'$regex' : '^Sherlock' } } }")))
46+
//println(aggregatedRdd.count)
47+
48+
//Simple Popularity based Recommendation System
49+
val bookRatingDF = MongoSpark.load(spark.sparkContext, ReadConfig(Map("uri" -> "mongodb://lms:[email protected]/lms_db.UserBookRating"))).toDF()
50+
val raters = bookRatingDF.groupBy(functions.col("ISBN")).agg(functions.count("rating").as("count"))
51+
val topRaters = raters.sort(desc("count")).toDF().limit(10)
52+
val joinedDF = topRaters.join(bookDF, Seq("ISBN"))
53+
joinedDF.show(false)
54+
joinedDF.printSchema()
55+
56+
//Collaborative Filtering using ALS (alternating least squares) Spark ML
57+
import spark.sqlContext.implicits._
58+
//create DF with userId as integer (ALS requires integer for Ids)
59+
val stringindexer1 = new StringIndexer().setInputCol("userId").setOutputCol("userIdNum")
60+
val modelc1 = stringindexer1.fit(bookRatingDF)
61+
val bookRatingT1DF = modelc1.transform(bookRatingDF)
62+
val stringindexer2 = new StringIndexer().setInputCol("ISBN").setOutputCol("isbnNum")
63+
val modelc2 = stringindexer2.fit(bookRatingT1DF)
64+
val bookRatingNewDF = modelc2.transform(bookRatingT1DF)
65+
//TODO: save the mapping userId -> userNum and ISBN -> isbnNum mapping in mongodb
66+
67+
val Array(training, test) = bookRatingNewDF.randomSplit(Array(0.8, 0.2))
68+
// Build the recommendation model using ALS on the training data
69+
val als = new ALS().setMaxIter(5).setRegParam(0.01).setUserCol("userIdNum").setItemCol("isbnNum").setRatingCol("rating")
70+
val model = als.fit(training)
71+
// Evaluate the model by computing the RMSE on the test data
72+
// Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
73+
model.setColdStartStrategy("drop")
74+
val predictions = model.transform(test)
75+
val evaluator = new RegressionEvaluator().setMetricName("rmse").setLabelCol("rating").setPredictionCol("prediction")
76+
val rmse = evaluator.evaluate(predictions)
77+
println(s"Root-mean-square error = $rmse")
78+
//use the model to generate a set of 10 recommended movies for each user in the dataset,
79+
//val docs = predictions.map( r => {
80+
// ( r.getAs("userId"), r.getAs("ISBN"), r.getAs("rating") )
81+
//} )
82+
// .toDF( "userId", "ISBN", "rating" )
83+
84+
//println(s"Generate top 3 movie recommendations for each user")
85+
//val userRecs = model.recommendForAllUsers(3).show(truncate = false)
86+
//println(s"Generate top 3 user recommendations for each book")
87+
//val bookRecs = model.recommendForAllItems(3).show(truncate = false)
88+
89+
println(s"Generate top 3 movie recommendations for a specified set of users (3)")
90+
val users = bookRatingNewDF.select(als.getUserCol).distinct().limit(3)
91+
val userSubsetRecs = model.recommendForUserSubset(users, 3).show(truncate = false)
92+
println(s"Generate top 3 user recommendations for a specified set of books (3)")
93+
val books = bookRatingNewDF.select(als.getItemCol).distinct().limit(3)
94+
val booksSubSetRecs = model.recommendForItemSubset(books, 3).show(truncate = false)
95+
96+
// Directly from prediction table: where is for case when we have big DataFrame with many users
97+
//model.transform (bookRatingNewDF.where('userId === givenUserId))
98+
// .select ('isbn, 'prediction)
99+
// .orderBy('prediction.desc)
100+
// .limit(N)
101+
// .map { case Row (isbn: Int, prediction: Double) => (isbn, prediction) }
102+
// .collect()
103+
}
104+
}
105+
106+
107+
108+
109+

src/main/scala/org/repl/kafkasparkmongo/samples/SparkScalaMongo.scala

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,74 @@ object SparkScalaMongo {
4545
//Load data again to check if the insert was successful
4646
val studentsData = MongoSpark.load(spark)
4747
studentsData.show(false)
48-
4948
}
49+
50+
//https://github.com/mongodb/mongo-spark/tree/master/examples/src/test/scala/tour
51+
52+
//Read
53+
//val df3 = sparkSession.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://example.com/database.collection"))) // ReadConfig used for configuration
54+
//val df4 = sparkSession.read.mongo() // SparkSession used for configuration
55+
//sqlContext.read.format("mongo").load()
56+
//usersDF = spark.read.mongo(ReadConfig(Map("uri" -> "mongodb://lms:[email protected]/lms_db.User")))
57+
//MongoSpark.load[Character](sparkSession, ReadConfig(Map("collection" -> "hundredClub"), Some(ReadConfig(sparkSession)))).show()
58+
59+
60+
//# Write to Spark
61+
//#1
62+
//sparkSession.sparkContext.parallelize(docs.map(Document.parse)).saveToMongoDB()
63+
//#2
64+
//sparkdf.write.option("collection", "hundredClub").mode("overwrite").mongo()
65+
//sparkdf.write.option("collection", "hundredClub").format("mongo").save()
66+
//sparkdf.write.format("com.mongodb.spark.sql.DefaultSource")
67+
// .mode("append").option("spark.mongodb.output.uri", "mongodb://host101:27017/dbName.collName")
68+
// .option("replaceDocument", "false")
69+
// .save()
70+
//#3
71+
//MongoSpark.save(df,WriteConfig).mode("overwrite")) //drops collection before writing the results, if the collection already exists.
72+
73+
74+
75+
76+
//query
77+
//personDf.select($"_id", $"addresses"(0)("street"), $"country"("name"))
78+
//val aggregatedRdd = uDF.withPipeline(Seq(Document.parse("{ '$match': { 'Book-Title' : {'$regex' : '^Sherlock' } } }")))
79+
80+
//mongodb
81+
//db.zipcodes.aggregate( [
82+
//{ $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
83+
//{ $match: { totalPop: { $gte: 10*1000*1000 } } }
84+
//] )
85+
//spark equivalent
86+
//println( "States with Populations above 10 Million" )
87+
//import zipDf.sqlContext.implicits._ // 1)
88+
//zipDf.groupBy("state")
89+
//.sum("pop")
90+
//.withColumnRenamed("sum(pop)", "count") // 2)
91+
//.filter($"count" > 10000000)
92+
//.show()
93+
94+
//* aggregate with connector
95+
//val aggregatedRdd = uDF.withPipeline(Seq(Document.parse("{ '$match': { 'Book-Title' : {'$regex' : '^Sherlock' } } }")))
96+
97+
//BasicDBObject dateRange = new BasicDBObject ("$gte", new Date(current.getYear(), current.getMonth(), current.getDate());
98+
//dateRange.put("$lt", new Date(current.getYear(), current.getMonth() - 1, current.getDate());
99+
//BasicDBObject query = new BasicDBObject("created_on", dateRange);
100+
//OR BasicDBObject query = new BasicDBObject("created_on", new BasicDBObject("$gte", new DateTime().toDate()).append("$lt", new DateTime().toDate()));
101+
//rdd.withPipeline(singletonList(query));
102+
103+
//using predicate pushdown (filter and select)
104+
//zipDf
105+
//.filter($"pop" > 0)
106+
//.select("state")
107+
//.explain(true)
108+
109+
//using sqlcontext
110+
//zipDf.createOrReplaceTempView("zips") // 1)
111+
//zipDf.sqlContext.sql( // 2)
112+
//"""SELECT state, sum(pop) AS count
113+
// FROM zips
114+
// GROUP BY state
115+
// HAVING sum(pop) > 10000000"""
116+
//)
117+
//.show()
50118
}

0 commit comments

Comments
 (0)