Skip to content

Commit 7c6e27d

Browse files
added progs for chapter 5
1 parent 7af7324 commit 7c6e27d

6 files changed

+214
-0
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Each record has the following format:
2+
3+
<customer_id><,><year><,><transaction_id><,><transaction_value>

code/chap05/python/customers.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
c1,2019,T0011,20.67
2+
c1,2019,T0012,12.34
3+
c1,2019,T0013,44.30
4+
c1,2018,T0001,20.67
5+
c1,2018,T0002,12.34
6+
c1,2018,T0003,44.30
7+
c2,2019,T0017,744.30
8+
c2,2019,T0018,820.67
9+
c2,2018,T0022,182.34
10+
c2,2018,T0033,494.30
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
export INPUT_PATH="customers.txt"
2+
export OUTPUT_PATH="/tmp/partition_demo"
3+
export PROG="partition_data_by_customer_and_year.py"
4+
export SPARK_HOME=/book/spark-3.2.0
5+
$SPARK_HOME/bin/spark-submit $PROG $INPUT_PATH $OUTPUT_PATH
6+
input_path= customers.txt
7+
output_path= /tmp/partition_demo
8+
9+
df:
10+
+-----------+----+--------------+-----------------+
11+
|customer_id|year|transaction_id|transaction_value|
12+
+-----------+----+--------------+-----------------+
13+
|c1 |2019|T0011 |20.67 |
14+
|c1 |2019|T0012 |12.34 |
15+
|c1 |2019|T0013 |44.3 |
16+
|c1 |2018|T0001 |20.67 |
17+
|c1 |2018|T0002 |12.34 |
18+
|c1 |2018|T0003 |44.3 |
19+
|c2 |2019|T0017 |744.3 |
20+
|c2 |2019|T0018 |820.67 |
21+
|c2 |2018|T0022 |182.34 |
22+
|c2 |2018|T0033 |494.3 |
23+
+-----------+----+--------------+-----------------+
24+
25+
df.schema:
26+
root
27+
|-- customer_id: string (nullable = true)
28+
|-- year: integer (nullable = true)
29+
|-- transaction_id: string (nullable = true)
30+
|-- transaction_value: double (nullable = true)
31+
32+
df2:
33+
+--------------+-----------------+-----------+----+
34+
|transaction_id|transaction_value|customer_id|year|
35+
+--------------+-----------------+-----------+----+
36+
|T0011 |20.67 |c1 |2019|
37+
|T0012 |12.34 |c1 |2019|
38+
|T0013 |44.3 |c1 |2019|
39+
|T0001 |20.67 |c1 |2018|
40+
|T0002 |12.34 |c1 |2018|
41+
|T0003 |44.3 |c1 |2018|
42+
|T0017 |744.3 |c2 |2019|
43+
|T0018 |820.67 |c2 |2019|
44+
|T0022 |182.34 |c2 |2018|
45+
|T0033 |494.3 |c2 |2018|
46+
+--------------+-----------------+-----------+----+
47+
48+
df2.schema:
49+
root
50+
|-- transaction_id: string (nullable = true)
51+
|-- transaction_value: double (nullable = true)
52+
|-- customer_id: string (nullable = true)
53+
|-- year: integer (nullable = true)
54+
55+
56+
$ ls -1R /tmp/partition_demo/
57+
_SUCCESS
58+
customer_id=c1
59+
customer_id=c2
60+
61+
/tmp/partition_demo//customer_id=c1:
62+
year=2018
63+
year=2019
64+
65+
/tmp/partition_demo//customer_id=c1/year=2018:
66+
part-00000-8905097e-a6d3-4cb7-8b40-879073ec51bc.c000.snappy.parquet
67+
68+
/tmp/partition_demo//customer_id=c1/year=2019:
69+
part-00000-8905097e-a6d3-4cb7-8b40-879073ec51bc.c000.snappy.parquet
70+
71+
/tmp/partition_demo//customer_id=c2:
72+
year=2018
73+
year=2019
74+
75+
/tmp/partition_demo//customer_id=c2/year=2018:
76+
part-00000-8905097e-a6d3-4cb7-8b40-879073ec51bc.c000.snappy.parquet
77+
78+
/tmp/partition_demo//customer_id=c2/year=2019:
79+
part-00000-8905097e-a6d3-4cb7-8b40-879073ec51bc.c000.snappy.parquet
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#!/usr/bin/env python
2+
#-----------------------------------------------------
3+
# 1. Read customer.txt
4+
# 2. Create a DataFrame with 4 columns:
5+
# { <customer_id>,
6+
# <year>,
7+
# <transaction_id>,
8+
# <transaction_value> }
9+
# 3. Partition data by (<customer_id>, <year>)
10+
#-------------------------------------------------------
11+
# @author Mahmoud Parsian
12+
#-------------------------------------------------------
13+
from __future__ import print_function
14+
import sys
15+
from pyspark.sql import SparkSession
16+
17+
# define input path
18+
input_path = sys.argv[1]
19+
print("input_path=", input_path)
20+
21+
# define output path for partitioned data
22+
output_path = sys.argv[2]
23+
print("output_path=", output_path)
24+
25+
# create a SparkSession object
26+
spark = SparkSession.builder.getOrCreate()
27+
28+
29+
# create a DataFrame, note that toDF() returns a
30+
# new DataFrame with new specified column names
31+
# columns = ('customer_id', 'year', 'transaction_id', 'transaction_value')
32+
df = spark.read.option("inferSchema", "true")\
33+
.csv(input_path)\
34+
.toDF('customer_id', 'year', 'transaction_id', 'transaction_value')
35+
#
36+
df.show(truncate=False)
37+
df.printSchema()
38+
#
39+
# partition data by 'customer_id', and then by 'year'
40+
# each partition will have one or more files
41+
df.write.partitionBy('customer_id', 'year')\
42+
.parquet(output_path)
43+
44+
# read the partitioned data back to another DataFrame
45+
df2 = spark.read.parquet(output_path)
46+
df2.show(truncate=False)
47+
df2.printSchema()
48+
49+
# done!
50+
spark.stop()
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#-----------------------------------------------------
2+
# This is a shell script to run the following program:
3+
# partition_data_by_customer_and_year.py
4+
#-----------------------------------------------------
5+
# @author Mahmoud Parsian
6+
#-----------------------------------------------------
7+
export SPARK_HOME="/book/spark-3.2.0"
8+
export INPUT_PATH="/book/code/chap05/customers.txt"
9+
export OUTPUT_PATH="/tmp/partition_demo"
10+
export SPARK_PROG="/book/code/chap05/partition_data_by_customer_and_year.py"
11+
#
12+
# run the PySpark program:
13+
$SPARK_HOME/bin/spark-submit $SPARK_PROG $INPUT_PATH $OUTPUT_PATH
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
#!/usr/bin/env python
2+
#-----------------------------------------------------
3+
#
4+
# NOTE:
5+
# This solution creates a SINGLE FILE
6+
# per created partition
7+
#
8+
#-----------------------------------------------------
9+
# 1. Read customer.txt
10+
# 2. Create a DataFrame with 4 columns:
11+
# { <customer_id>,
12+
# <year>,
13+
# <transaction_id>,
14+
# <transaction_value> }
15+
# 3. Partition data by (<customer_id>, <year>)
16+
#-------------------------------------------------------
17+
# @author Mahmoud Parsian
18+
#-------------------------------------------------------
19+
from __future__ import print_function
20+
import sys
21+
from pyspark.sql import SparkSession
22+
23+
# define input path
24+
input_path = sys.argv[1]
25+
print("input_path=", input_path)
26+
27+
# define output path for partitioned data
28+
output_path = sys.argv[2]
29+
print("output_path=", output_path)
30+
31+
# create a SparkSession object
32+
spark = SparkSession.builder.getOrCreate()
33+
34+
35+
# create a DataFrame, note that toDF() returns a
36+
# new DataFrame with new specified column names
37+
# columns = ('customer_id', 'year', 'transaction_id', 'transaction_value')
38+
df = spark.read.option("inferSchema", "true")\
39+
.csv(input_path)\
40+
.toDF('customer_id', 'year', 'transaction_id', 'transaction_value')
41+
#
42+
df.show(truncate=False)
43+
df.printSchema()
44+
#
45+
# partition data by 'customer_id', and then by 'year'
46+
# and create a SINFGLE FILE per created partition.
47+
# DataFrame.repartition('customer_id', 'year') qurantees
48+
# a single file per partition.
49+
df.repartition('customer_id', 'year')\
50+
.write.partitionBy('customer_id', 'year')\
51+
.parquet(output_path)
52+
53+
# read the partitioned data back to another DataFrame
54+
df2 = spark.read.parquet(output_path)
55+
df2.show(truncate=False)
56+
df2.printSchema()
57+
58+
# done!
59+
spark.stop()

0 commit comments

Comments
 (0)