@@ -4,109 +4,37 @@ import org.apache.spark.sql.SparkSession
4
4
object SparkTutorial {
5
5
def main (args : Array [String ]): Unit = {
6
6
7
- // Turn off logging
8
7
Logger .getLogger(" org" ).setLevel(Level .OFF )
9
8
Logger .getLogger(" akka" ).setLevel(Level .OFF )
10
9
11
- // --------------------------------------------------------------------------------------------------------
12
- // Setting up a Spark Session
13
- // --------------------------------------------------------------------------------------------------------
14
-
15
- // Create a SparkSession to work with Spark
10
+ // Create SparkSession
16
11
val sparkBuilder = SparkSession
17
12
.builder()
18
13
.appName(" SparkTutorial" )
19
- .master(" local[4]" ) // local, with 4 worker cores
14
+ .master(" local[4]" )
20
15
val spark = sparkBuilder.getOrCreate()
21
16
22
- // Set the default number of shuffle partitions (default is 200, which is too high for local deployment)
23
- spark.conf.set(" spark.sql.shuffle.partitions" , " 8" ) //
17
+ // Set the default number of shuffle partitions
18
+ spark.conf.set(" spark.sql.shuffle.partitions" , " 8" )
24
19
25
20
// Importing implicit encoders for standard library classes and tuples that are used as Dataset types
26
21
import spark .implicits ._
27
22
28
- println(" -----------------------------------------------------------------------------------------------" )
29
-
30
- // --------------------------------------------------------------------------------------------------------
31
- // Loading data
32
- // --------------------------------------------------------------------------------------------------------
33
-
34
- // Create a Dataset programmatically
35
- val numbers = spark.createDataset((0 until 100 ).toList)
36
-
37
- // Read a Dataset from a file
38
- val customers = spark.read
23
+ // Read Dataset from the file
24
+ val df = spark.read
39
25
.option(" inferSchema" , " true" )
40
26
.option(" header" , " true" )
41
27
.option(" sep" , " ;" )
42
- .csv(" data/tpch_customer.csv" ) // also text, json, jdbc, parquet
43
- .as[(Int , String , String , Int , String , String , String , String )]
44
-
45
- println(" -----------------------------------------------------------------------------------------------" )
46
-
47
- // --------------------------------------------------------------------------------------------------------
48
- // Basic transformations
49
- // --------------------------------------------------------------------------------------------------------
50
-
51
- // Basic transformations on datasets return new datasets
52
- val mapped = numbers.map(i => " This is a number: " + i)
53
- val filtered = mapped.filter(s => s.contains(" 1" ))
54
- val sorted = filtered.sort()
55
- List (numbers, mapped, filtered, sorted).foreach(dataset => println(dataset.getClass))
56
- sorted.show()
57
-
58
- println(" -----------------------------------------------------------------------------------------------" )
59
-
60
- // Basic terminal operations
61
- val collected = filtered.collect() // collects the entire dataset to the driver process
62
- val reduced = filtered.reduce((s1, s2) => s1 + " ," + s2) // reduces all values successively to one
63
- filtered.foreach(s => println(s)) // performs an action for each element (take care where the action is evaluated!)
64
- List (collected, reduced).foreach(result => println(result.getClass))
65
-
66
- println(" -----------------------------------------------------------------------------------------------" )
67
-
68
- // DataFrame and Dataset
69
- val untypedDF = numbers.toDF() // DS to DF
70
- val stringTypedDS = untypedDF.map(r => r.get(0 ).toString) // DF to DS via map
71
- val integerTypedDS = untypedDF.as[Int ] // DF to DS via as() function that cast columns to a concrete types
72
- List (untypedDF, stringTypedDS, integerTypedDS).foreach(result => println(result.head.getClass))
73
- List (untypedDF, stringTypedDS, integerTypedDS).foreach(result => println(result.head))
74
-
75
- println(" -----------------------------------------------------------------------------------------------" )
76
-
77
- // Mapping to tuples
78
- numbers
79
- .map(i => (i, " nonce" , 3.1415 , true ))
80
- .take(10 )
81
- .foreach(println(_))
82
-
83
- println(" -----------------------------------------------------------------------------------------------" )
84
-
85
- // SQL on DataFrames
86
- customers.createOrReplaceTempView(" customers" ) // make this dataframe visible as a table
87
- val sqlResult = spark.sql(" SELECT * FROM customers WHERE C_NATIONKEY = 15" ) // perform an sql query on the table
88
-
89
- import org .apache .spark .sql .functions ._
28
+ .csv(" data/test_customer.csv" )
90
29
91
- sqlResult // DF
92
- .as[(Int , String , String , Int , String , String , String , String )] // DS
93
- .sort(desc(" C_NATIONKEY" )) // desc() is a standard function from the spark.sql.functions package
94
- .head(10 )
95
- .foreach(println(_))
30
+ val columns = df.columns.toList
96
31
97
- println(" -----------------------------------------------------------------------------------------------" )
32
+ columns.foreach(column => {
33
+ val names = df.select(column).distinct.map(_.get(0 ).toString).collect.toList
34
+ println(names)
35
+ })
98
36
99
- // Grouping and aggregation for Datasets
100
- // val topEarners = customers
101
- // .groupByKey { case (name, age, salary, company) => company }
102
- // .mapGroups { case (key, iterator) =>
103
- // val topEarner = iterator.toList.maxBy(t => t._3) // could be problematic: Why?
104
- // (key, topEarner._1, topEarner._3)
105
- // }
106
- // .sort(desc("_3"))
107
- // topEarners.collect().foreach(t => println(t._1 + "'s top earner is " + t._2 + " with salary " + t._3))
108
37
109
- println(" -----------------------------------------------------------------------------------------------" )
110
38
111
39
}
112
40
}
0 commit comments