1+ import json
2+
3+ from datetime import datetime
14from io import BytesIO
25from tempfile import SpooledTemporaryFile
36
7+ from fastwarc .warc import WarcRecordType
48from pyspark .sql .types import IntegerType , StructType , StructField , StringType
59from resiliparse_parser import HTMLParser
610from warcio import WARCWriter
@@ -29,6 +33,8 @@ class WETExtractor(CCFastWarcSparkJob, CCFileProcessorSparkJob):
2933 ]), True )
3034 ])
3135
36+ fastwarc_record_filter = WarcRecordType .warcinfo | WarcRecordType .response | WarcRecordType .metadata
37+
3238 def __init__ (self ):
3339 self .html_parser = HTMLParser ()
3440
@@ -50,20 +56,88 @@ def log_accumulators(self, session):
5056 self .log_accumulator (session , self .wet_records_written ,
5157 'WET records written = {}' )
5258
59+ def create_warcinfo_record (self , wet_writer , wet_file_name , record , content ):
60+ wet_headers = {
61+ 'Software-Info' :
62+ 'cc-pyspark wet_extractor.py based on FastWARC, Resiliparse, warcio' ,
63+ 'Extracted-Date' :
64+ datetime .utcnow ()
65+ }
66+ wet_headers_from_warc = {
67+ 'robots' , 'ispartof' , 'operator' , 'description' , 'publisher' }
68+ if self .is_warcinfo_record (record ):
69+ # Add warcinfo fields from source WARC's warcinfo record
70+ try :
71+ content = content .decode ('utf-8' )
72+ for line in content .split ('\r \n ' ):
73+ if ':' in line :
74+ name , value = line .split (':' , 1 )
75+ if name .lower () in wet_headers_from_warc :
76+ wet_headers [name ] = value .strip ()
77+ except Exception as e :
78+ self .get_logger ().error ('Error parsing warcinfo: %s' , e )
79+ return wet_writer .create_warcinfo_record (wet_file_name , wet_headers )
80+
81+ def create_and_write_wet_record (self , wet_writer , cache ):
82+ """Write WET record using the data in cache.
83+
84+ If there is a metadata record:
85+ - pass the identified character encoding to the HTML parser
86+ - extract identified languages metadata record and add as
87+ WARC header field "WARC-Identified-Content-Language
88+ """
89+ self .get_logger ().debug ('Converting %s' , cache ['uri' ])
90+ if 'response' not in cache :
91+ self .get_logger ().error ('No response record for %s' , cache ['uri' ])
92+ return
93+ elif cache ['response' ][1 ] is None :
94+ # no content because not HTML
95+ return
96+ encoding = None
97+ languages = None
98+ if 'metadata' in cache :
99+ try :
100+ content = cache ['metadata' ][1 ].decode ('utf-8' )
101+ for line in content .split ('\r \n ' ):
102+ if line .startswith ('charset-detected:' ):
103+ _ , value = line .split (':' , 1 )
104+ encoding = value .strip ()
105+ elif line .startswith ('languages-cld2:' ):
106+ _ , value = line .split (':' , 1 )
107+ lang_cld2 = json .loads (value .strip ())
108+ if 'languages' in lang_cld2 :
109+ languages = ',' .join (map (lambda l : l ['code-iso-639-3' ],
110+ lang_cld2 ['languages' ]))
111+ except Exception as e :
112+ self .get_logger ().error ('Error parsing metadata: %s' , e )
113+ html_tree = self .html_parser .get_html_tree (cache ['response' ][1 ],
114+ encoding = encoding )
115+ text = self .html_parser .html_to_text (html_tree )
116+ wet_headers_dict = {
117+ 'WARC-Date' : cache ['date' ],
118+ 'WARC-Refers-To' : cache ['id' ],
119+ 'Content-Type' : 'text/plain' ,
120+ }
121+ if languages :
122+ wet_headers_dict ['WARC-Identified-Content-Language' ] = languages
123+ wet_record = wet_writer .create_warc_record (
124+ cache ['uri' ], 'conversion' ,
125+ payload = BytesIO (str .encode (text , 'UTF-8' )),
126+ warc_headers_dict = wet_headers_dict )
127+ wet_writer .write_record (wet_record )
128+
53129 def process_record (self , record ):
130+ if self .is_warcinfo_record (record ) or self .is_metadata_record (record ):
131+ yield record , self .get_payload_stream (record ).read ()
54132 if not self .is_response_record (record ):
55- # skip over WARC request or metadata records
56- return ''
133+ # skip over WARC request records
134+ return
57135 if not self .is_html (record ):
136+ # non-HTML record: yield without content
58137 self .records_non_html .add (1 )
59- return ''
60- self .get_logger ().debug ('Converting %s' ,
61- self .get_warc_header (record , 'WARC-Target-URI' ))
62- page = self .get_payload_stream (record ).read ()
63- encoding = self .get_warc_header (record , 'WARC-Identified-Content-Charset' )
64- html_tree = self .html_parser .get_html_tree (page , encoding = encoding )
65- text = self .html_parser .html_to_text (html_tree )
66- yield record , text
138+ yield record , None
139+ return
140+ yield record , self .get_payload_stream (record ).read ()
67141
68142 def process_warc (self , uri , stream ):
69143 wet_file_name = uri .split ('/' )[- 1 ].replace ('.warc.gz' , '.warc.wet.gz' )
@@ -74,31 +148,37 @@ def process_warc(self, uri, stream):
74148 mode = 'w+b' ,
75149 dir = self .args .local_temp_dir ) as wet_temp :
76150 wet_writer = WARCWriter (wet_temp , gzip = True )
77- wet_writer .create_warcinfo_record (wet_file_name , {
78- 'Software-Info' :
79- 'cc-pyspark wet_extractor.py based on FastWARC, Resiliparse, warcio' ,
80- # TODO: Add more warcinfo fields from source WARC's warcinfo record
81- })
82151 offset = wet_temp .tell ()
152+ cache = {'uri' : None , 'date' : None }
83153 for res in super (WETExtractor , self ).process_warc (uri , stream ):
84- warc_target_uri = self .get_warc_header (res [0 ], 'WARC-Target-URI' )
85- warc_date = self .get_warc_header (res [0 ], 'WARC-Date' )
86- warc_record_id = self .get_warc_header (res [0 ], 'WARC-Record-ID' )
87- # TODO: extract language from following metadata record and add as
88- # WARC header field "WARC-Identified-Content-Language"
89- wet_record = wet_writer .create_warc_record (
90- warc_target_uri , 'conversion' ,
91- payload = BytesIO (str .encode (res [1 ], 'UTF-8' )),
92- warc_headers_dict = {
93- 'WARC-Date' : warc_date ,
94- 'WARC-Refers-To' : warc_record_id ,
95- 'Content-Type' : 'text/plain' ,
96- })
97- wet_writer .write_record (wet_record )
98- end_offset = wet_temp .tell ()
99- yield warc_target_uri , (wet_file_name , offset , (end_offset - offset ))
100- offset = end_offset
101- self .wet_records_written .add (1 )
154+ (record , content ) = res
155+ if offset == 0 :
156+ wet_writer .write_record (
157+ self .create_warcinfo_record (wet_writer , wet_file_name , record , content ))
158+ offset = wet_temp .tell ()
159+ if self .is_warcinfo_record (record ):
160+ continue
161+ warc_target_uri = self .get_warc_header (record , 'WARC-Target-URI' )
162+ warc_date = self .get_warc_header (record , 'WARC-Date' )
163+ if cache ['uri' ] and (cache ['uri' ] != warc_target_uri
164+ or cache ['date' ] != warc_date ):
165+ self .create_and_write_wet_record (wet_writer , cache )
166+ end_offset = wet_temp .tell ()
167+ yield cache ['uri' ], (wet_file_name , offset , (end_offset - offset ))
168+ offset = end_offset
169+ self .wet_records_written .add (1 )
170+ cache = {'uri' : None , 'date' : None }
171+ if cache ['uri' ] is None and cache ['date' ] is None :
172+ cache ['uri' ] = warc_target_uri
173+ cache ['date' ] = warc_date
174+ if self .is_response_record (record ):
175+ cache ['response' ] = (record , content )
176+ cache ['id' ] = self .get_warc_header (record , 'WARC-Record-ID' )
177+ elif self .is_metadata_record (record ):
178+ cache ['metadata' ] = (record , content )
179+ self .create_and_write_wet_record (wet_writer , cache )
180+ end_offset = wet_temp .tell ()
181+ yield cache ['uri' ], (wet_file_name , offset , (end_offset - offset ))
102182 wet_temp .seek (0 )
103183 self .write_output_file (wet_file_name , wet_temp , self .args .output_base_url )
104184
0 commit comments