-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask2.py
More file actions
49 lines (40 loc) · 1.51 KB
/
task2.py
File metadata and controls
49 lines (40 loc) · 1.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import sys
import json
from time import time
from pyspark import SparkContext
rev_filepath = sys.argv[1]
put_filepath = sys.argv[2]
numb_part = int(sys.argv[3])
sc = SparkContext("local[*]", "Task2")
sc.setLogLevel("ERROR")
revs = sc.textFile(rev_filepath).map(lambda x: json.loads(x))
def custom_part(key):
return int(hash(key) % numb_part)
default_start_time = time()
busi_counts_default = revs.map(lambda line: (line["business_id"], 1)).reduceByKey(lambda a, b: a + b)
top_businesses_default = busi_counts_default.takeOrdered(10, key=lambda x: (-x[1], x[0]))
default_exe_time = time() - default_start_time
numb_part_default = busi_counts_default.getNumPartitions()
items_part_default = busi_counts_default.glom().map(len).collect()
custom_start_time = time()
revs_custom = revs.map(lambda line: (line["business_id"], 1)).partitionBy(numb_part, custom_part)
busi_counts_custom = revs_custom.reduceByKey(lambda a, b: a + b)
top_businesses_custom = busi_counts_custom.top(10, key=lambda x: x[1])
custom_exe_time = time() - custom_start_time
numb_part_custom = busi_counts_custom.getNumPartitions()
items_part_custom = busi_counts_custom.glom().map(len).collect()
output ={
"default":{
"n_partition": numb_part_default,
"n_items":items_part_default,
"exe_time": default_exe_time
},
"customized":{
"n_partition":numb_part_custom,
"n_items":items_part_custom,
"exe_time":custom_exe_time
}
}
with open(put_filepath, 'w') as out_file:
json.dump(output, out_file)
sc.stop()