@@ -24,8 +24,6 @@ class ZipNumClusterCdx(CCFileProcessorSparkJob):
2424
2525 name = 'ZipNumClusterCdx'
2626
27- LOG = logging .getLogger ('ZipNumClusterCdx' )
28-
2927 DATA_URL_PATTERN = re .compile ('^(s3|https?|file|hdfs|s3a|s3n):(?://([^/]*))?/(.*)' )
3028
3129
@@ -34,8 +32,9 @@ def add_arguments(self, parser):
3432 parser .add_argument ("--output_base_url" , required = True ,
3533 help = "Output destination." )
3634 parser .add_argument ("--partition_boundaries_file" , required = True ,
37- help = "Full path to a JSON file containing partition boundaries."
38- "If specified, and does not exist, will be created, otherwise, will be used." )
35+ help = "Full path to a JSON file containing partition boundaries. "
36+ "If specified, and does not exist, will be created, otherwise, "
37+ "it will be used." )
3938 parser .add_argument ("--temporary_output_base_url" , required = True ,
4039 help = "Temporary output location for per-shard cluster indexes." )
4140 parser .add_argument ("--num_lines" , type = int , required = False ,
@@ -93,25 +92,27 @@ def write_output_file(uri, fd, base_uri=None):
9392 else :
9493 # keep local file paths as is
9594 path = uri
95+ scheme = 'file'
96+ netloc = None
9697
97- if scheme in [ 's3' , 's3a' , 's3n' ] :
98+ if scheme in { 's3' , 's3a' , 's3n' } :
9899 bucketname = netloc
99100 output_path = path
100101 try :
101102 client = boto3 .client ('s3' )
102103 client .upload_fileobj (fd , bucketname , path )
103104 except botocore .client .ClientError as exception :
104- ZipNumClusterCdx . LOG .error (
105+ logging .error (
105106 'Failed to write to S3 {}: {}' .format (output_path , exception ))
106107
107- elif scheme == 'http' or scheme == 'https' :
108+ elif scheme in { 'http' , 'https' } :
108109 raise ValueError ('HTTP/HTTPS output not supported' )
109110
110111 elif scheme == 'hdfs' :
111112 raise NotImplementedError ('HDFS output not implemented' )
112113
113114 else :
114- ZipNumClusterCdx . LOG .info ('Writing local file {}' .format (uri ))
115+ logging .info ('Writing local file {}' .format (uri ))
115116 if scheme == 'file' :
116117 # must be an absolute path
117118 uri = os .path .join ('/' , path )
@@ -130,7 +131,7 @@ def write_partition_with_global_seq(idx: int, partition_iter: list,
130131 # Calculate starting sequence number for this partition
131132 start_seq = (idx * records_per_partition ) + 1 if records_per_partition else 1
132133
133- with open (partition_idx_file , 'w' ) as f :
134+ with open (partition_idx_file , 'w' , encoding = "utf-8" ) as f :
134135 seq = start_seq
135136 for record in partition_iter :
136137 min_surt , _ , min_surt_timestamp , filename , _ , offset , length , _ = record
@@ -157,6 +158,7 @@ def process_partition(partition_id: int, partition_iter: Iterator[Tuple[str, Tup
157158 current_chunk = []
158159 chunk_min_surt = None
159160 chunk_max_surt = None
161+ chunk_min_timestamp = None
160162
161163 with open (output_filename , 'wb' ) as f :
162164 for (surt_key , timestamp ), json_data in partition_iter :
@@ -222,25 +224,26 @@ def process_partition(partition_id: int, partition_iter: Iterator[Tuple[str, Tup
222224 return final_files
223225
224226 def run_job (self , session ):
225- input = self .args .input_base_url + self .args .input
227+ input_url = self .args .input_base_url + self .args .input
226228 num_partitions = self .args .num_output_partitions
227229 boundaries_file_uri = self .args .partition_boundaries_file
228230 num_lines = self .args .num_lines
229231 output_base_url = self .args .output_base_url
230232 temporary_output_base_url = self .args .temporary_output_base_url
231233
232- rdd = session .sparkContext .textFile (input ).map (
234+ rdd = session .sparkContext .textFile (input_url ).map (
233235 self .parse_line ).filter (lambda x : x is not None )
234236
235237 boundaries = None
236- logging .info (f"Boundaries file: { boundaries_file_uri } " )
238+ self . get_logger ( session ) .info (f"Boundaries file: { boundaries_file_uri } " )
237239 if boundaries_file_uri and self .check_for_output_file (boundaries_file_uri ):
238- logging .info (f"Boundaries file found, using it: { boundaries_file_uri } " )
240+ self . get_logger ( session ) .info (f"Boundaries file found, using it: { boundaries_file_uri } " )
239241 with self .fetch_file (boundaries_file_uri ) as f :
240242 boundaries = list (map (lambda l : tuple (l ), json .load (f )))
241243
242244 else :
243- # this percent needs to be pretty small, since this collect brings data back to driver...
245+ # The percentage needs to be pretty small, since the collect
246+ # brings data back to the driver...
244247 # 1/2 percent should be fine
245248 samples = rdd .keys ().sample (False , 0.005 ).collect ()
246249 samples .sort ()
@@ -254,15 +257,16 @@ def run_job(self, session):
254257 boundaries .append (samples [idx ])
255258
256259 temp_file_name = 'temp_range_boundaries.json'
257- with open (temp_file_name , 'w' ) as f :
260+ with open (temp_file_name , 'w' , encoding = "utf-8" ) as f :
258261 json .dump (boundaries , f )
259262
260263 with open (temp_file_name , 'rb' ) as f :
261264 self .write_output_file (boundaries_file_uri , f )
262265
263266 os .unlink (temp_file_name )
264267
265- logging .info (f"Boundaries file created: { boundaries_file_uri } " )
268+ self .get_logger (session ).info (
269+ f"Boundaries file created: { boundaries_file_uri } " )
266270
267271 rdd = rdd .repartitionAndSortWithinPartitions (
268272 numPartitions = num_partitions ,
@@ -287,4 +291,4 @@ def run_job(self, session):
287291
288292if __name__ == "__main__" :
289293 job = ZipNumClusterCdx ()
290- job .run ()
294+ job .run ()
0 commit comments