Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions indexwarcsjob-ccpyspark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import logging
from sparkcc import CCFileProcessorSparkJob

from tempfile import TemporaryFile
from pywb.indexer.cdxindexer import write_cdx_index
from gzip import GzipFile


LOG = logging.getLogger('IndexWARCJob')


#=============================================================================
class IndexWARCJob(CCFileProcessorSparkJob):
""" This job receives as input a manifest of WARC/ARC files and produces
a CDX index per file

The pywb.indexer.cdxindexer is used to create the index, with a fixed set of options
"""

name = 'IndexWARCJob'

index_options = {
'surt_ordered': True,
'sort': True,
'cdxj': True,
#'minimal': True
}

def add_arguments(self, parser):
super(CCFileProcessorSparkJob, self).add_arguments(parser)
parser.add_argument("--output_base_url", required=False,
default='my_cdx_bucket',
help="destination for cdx output")

parser.add_argument("--skip-existing", dest='skip_existing', action='store_true',
help="skip processing files that already have CDX")

def _conv_warc_to_cdx_path(self, warc_path):
# set cdx path
cdx_path = warc_path.replace('crawl-data', 'cc-index/cdx')
cdx_path = cdx_path.replace('.warc.gz', '.cdx.gz')
cdx_path = cdx_path.replace('.warc.wet.gz', '.wet.cdx.gz')
cdx_path = cdx_path.replace('.warc.wat.gz', '.wat.cdx.gz')
return cdx_path

def process_file(self, warc_path, tempfd):

cdx_path = self._conv_warc_to_cdx_path(warc_path)

LOG.info('Indexing WARC: %s', warc_path)

if self.args.skip_existing and \
self.check_for_output_file(cdx_path,self.args.output_base_url):
LOG.info('Already Exists: %s', cdx_path)
yield cdx_path, 0
return

with TemporaryFile(mode='w+b',
dir=self.args.local_temp_dir) as cdxtemp:

success = False
with GzipFile(fileobj=cdxtemp, mode='w+b') as cdxfile:
# Index to temp
try:
write_cdx_index(cdxfile, tempfd, warc_path, **self.index_options)
success = True
except Exception as exc:
# log detailed stack trace
LOG.error('Failed to index %s: %s', warc_path, exc)

# Upload temp
cdxtemp.flush()
cdxtemp.seek(0)

if success:
self.write_output_file(cdx_path, cdxtemp, self.args.output_base_url)
LOG.info('Successfully uploaded CDX: %s', cdx_path)
yield cdx_path, 1
else:
yield cdx_path, -1


if __name__ == "__main__":
job = IndexWARCJob()
job.run()