Skip to content

Commit 23e606d

Browse files
author
Cheng Hao
committed
Add Data source examples
1 parent 55a62ba commit 23e606d

File tree

2 files changed

+155
-0
lines changed

2 files changed

+155
-0
lines changed

datasources.md

+95
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
- Data source Example
2+
```scala
3+
package org.scalatraining.datasource
4+
5+
import org.apache.spark.rdd.RDD
6+
import org.apache.spark.sql.SQLContext
7+
import org.apache.spark.sql.catalyst.types.{IntegerType, StringType}
8+
import org.apache.spark.sql.catalyst.expressions.{Row, GenericMutableRow}
9+
import org.apache.spark.sql.catalyst.types.{StructField, StructType}
10+
import org.apache.spark.sql.sources.{PrunedScan, BaseRelation, RelationProvider}
11+
12+
object BuildHelper {
13+
def idBuilder(idx: Int) = idx
14+
15+
def userNameBuilder(idx: Int) = s"name_$idx"
16+
17+
def passBuilder(idx: Int) = s"pass_$idx"
18+
}
19+
20+
case class MyPrunedScan(count: Int, slices: Int)(@transient val sqlContext: SQLContext) extends PrunedScan {
21+
override def sizeInBytes = 20 * count
22+
23+
override def schema =
24+
StructType(
25+
StructField("uid", IntegerType, nullable = false) ::
26+
StructField("name", StringType, nullable = false) ::
27+
StructField("password", StringType, nullable = false) ::
28+
Nil)
29+
30+
override def buildScan(requiredColumns: Array[String]): RDD[Row] = {
31+
val builders = requiredColumns.map { column =>
32+
column match {
33+
case "uid" => BuildHelper.idBuilder _
34+
case "name" => BuildHelper.userNameBuilder _
35+
case "password" => BuildHelper.passBuilder _
36+
case _ => sys.error(s"Cannot find the column $column")
37+
}
38+
}
39+
40+
// TO DO do something to query the databases
41+
val row = new GenericMutableRow(requiredColumns.length)
42+
sqlContext.sparkContext.parallelize(1 to 1000, slices).map { i =>
43+
var idx = 0
44+
while (idx < builders.length) {
45+
row(idx) = builders(idx)(i)
46+
idx += 1
47+
}
48+
49+
row
50+
}
51+
}
52+
}
53+
54+
class MyPrunedScanProvider extends RelationProvider {
55+
override def createRelation(
56+
sqlContext: SQLContext,
57+
parameters: Map[String, String]): BaseRelation = {
58+
MyPrunedScan(parameters("count").toInt, parameters("slices").toInt)(sqlContext)
59+
}
60+
}
61+
```
62+
63+
- Compile & Make a jar
64+
65+
```shell
66+
scalac -cp ~/spark/lib/spark-assembly-1.2.0-hadoop1.0.4.jar datasources.scala
67+
jar -cvf ds.jar org
68+
```
69+
70+
- Start the Spark Shell
71+
72+
```shell
73+
~/spark/bin/spark-shell --jars ds.jar
74+
```
75+
76+
- Create Temporal Table
77+
```scala
78+
import org.apache.spark.sql.hive.HiveContext
79+
val hc = new HiveContext(sc)
80+
81+
import hc._
82+
83+
sql(
84+
"""
85+
|CREATE TEMPORARY TABLE users
86+
|USING org.scalatraining.datasource.MyPrunedScanProvider
87+
|OPTIONS (
88+
| count '100',
89+
| slices '3'
90+
|)
91+
""".
92+
stripMargin)
93+
sql("select uid, name from users limit 5").collect.foreach(println)
94+
sql("explain extended select uid, name from users limit 5").collect.foreach(println)
95+
```

datasources.scala

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package org.scalatraining.datasource
2+
3+
import org.apache.spark.rdd.RDD
4+
import org.apache.spark.sql.SQLContext
5+
import org.apache.spark.sql.catalyst.types.{IntegerType, StringType}
6+
import org.apache.spark.sql.catalyst.expressions.{Row, GenericMutableRow}
7+
import org.apache.spark.sql.catalyst.types.{StructField, StructType}
8+
import org.apache.spark.sql.sources.{PrunedScan, BaseRelation, RelationProvider}
9+
10+
object BuildHelper {
11+
def idBuilder(idx: Int) = idx
12+
13+
def userNameBuilder(idx: Int) = s"name_$idx"
14+
15+
def passBuilder(idx: Int) = s"pass_$idx"
16+
}
17+
18+
case class MyPrunedScan(count: Int, slices: Int)(@transient val sqlContext: SQLContext) extends PrunedScan {
19+
override def sizeInBytes = 20 * count
20+
21+
override def schema =
22+
StructType(
23+
StructField("uid", IntegerType, nullable = false) ::
24+
StructField("name", StringType, nullable = false) ::
25+
StructField("password", StringType, nullable = false) ::
26+
Nil)
27+
28+
override def buildScan(requiredColumns: Array[String]): RDD[Row] = {
29+
val builders = requiredColumns.map { column =>
30+
column match {
31+
case "uid" => BuildHelper.idBuilder _
32+
case "name" => BuildHelper.userNameBuilder _
33+
case "password" => BuildHelper.passBuilder _
34+
case _ => sys.error(s"Cannot find the column $column")
35+
}
36+
}
37+
38+
// TO DO do something to query the databases
39+
val row = new GenericMutableRow(requiredColumns.length)
40+
sqlContext.sparkContext.parallelize(1 to 1000, slices).map { i =>
41+
var idx = 0
42+
while (idx < builders.length) {
43+
row(idx) = builders(idx)(i)
44+
idx += 1
45+
}
46+
47+
row
48+
}
49+
}
50+
}
51+
52+
class MyPrunedScanProvider extends RelationProvider {
53+
override def createRelation(
54+
sqlContext: SQLContext,
55+
parameters: Map[String, String]): BaseRelation = {
56+
MyPrunedScan(parameters("count").toInt, parameters("slices").toInt)(sqlContext)
57+
}
58+
}
59+
60+

0 commit comments

Comments
 (0)