diff --git a/README.md b/README.md index c7ebb6e..420ca3f 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,10 @@ WebArchive URL Indexing This project contains several scripts (MapReduce jobs) for generating URL indexes of web archive collections, usually containing large number of WARC (or ARC) files. The scripts are designed to ran on Hadoop or Amazon EMR to process terabytes or even petabytes of web archive content. Additionally, thanks to flexibility of the MRJob library, the scripts can also run on a local machine to build an index cluster. +The indexer was originally implemented based on [mrjob](//github.com/Yelp/mrjob). Because mrjob is no longer maintained, it was ported to PySpark (based on [cc-pyspark](//github.com/commoncrawl/cc-pyspark/)) in 2024/2025. For running the Spark jobs, see [run_index_ccpyspark.sh](run_index_ccpyspark.sh) and [requirements_ccpyspark.txt](requirements_ccpyspark.txt). See also the documentation of [cc-pyspark](//github.com/commoncrawl/cc-pyspark/). + +The description below documents how to run the MapReduce jobs. + ## Initial Setup and Usage Python 3 is required - see the branch `python-2.7` for a previous version running on Python 2.7 (not maintained anymore). diff --git a/indexwarcs_cc_pyspark.py b/indexwarcs_cc_pyspark.py new file mode 100644 index 0000000..2eec9ef --- /dev/null +++ b/indexwarcs_cc_pyspark.py @@ -0,0 +1,86 @@ +from gzip import GzipFile +from tempfile import TemporaryFile + +from pywb.indexer.cdxindexer import write_cdx_index + +from sparkcc import CCFileProcessorSparkJob + + + +class IndexWARCJob(CCFileProcessorSparkJob): + """ This job receives as input a manifest of WARC/ARC files and produces + a CDX index per file + + The pywb.indexer.cdxindexer is used to create the index, with a fixed set of options + """ + + name = 'IndexWARCJob' + + # description of input and output shown by --help + input_descr = "Path to file listing input paths (WARC/WAT/WET/ARC)" + output_descr = """Table containing the output CDX files +(in spark.sql.warehouse.dir) and the indexing status: + 1 successfully created, + 0 already exists, + -1 processing failed""" + + # PyWB index options + index_options = { + 'surt_ordered': True, + 'sort': True, + 'cdxj': True, + #'minimal': True + } + + def add_arguments(self, parser): + super(CCFileProcessorSparkJob, self).add_arguments(parser) + parser.add_argument("--output_base_url", required=True, + help="Destination for CDX output.") + parser.add_argument("--skip_existing", action='store_true', + help="Skip processing files for which " + "the output CDX file already exists.") + + def _conv_warc_to_cdx_path(self, warc_path): + cdx_path = warc_path.replace('crawl-data', 'cc-index/cdx') + cdx_path = cdx_path.replace('.warc.gz', '.cdx.gz') + cdx_path = cdx_path.replace('.warc.wet.gz', '.wet.cdx.gz') + cdx_path = cdx_path.replace('.warc.wat.gz', '.wat.cdx.gz') + return cdx_path + + def process_file(self, warc_path, tempfd): + + cdx_path = self._conv_warc_to_cdx_path(warc_path) + + self.get_logger().info('Indexing WARC: %s', warc_path) + + if self.args.skip_existing and \ + self.check_for_output_file(cdx_path,self.args.output_base_url): + self.get_logger().info('Already Exists: %s', cdx_path) + yield cdx_path, 0 + return + + with TemporaryFile(mode='w+b', + dir=self.args.local_temp_dir) as cdxtemp: + + success = False + with GzipFile(fileobj=cdxtemp, mode='w+b') as cdxfile: + try: + write_cdx_index(cdxfile, tempfd, warc_path, **self.index_options) + success = True + except Exception as exc: + self.get_logger().error('Failed to index %s: %s', warc_path, exc) + + cdxtemp.flush() + cdxtemp.seek(0) + + if success: + self.write_output_file(cdx_path, cdxtemp, self.args.output_base_url) + self.get_logger().info('Successfully uploaded CDX: %s', cdx_path) + yield cdx_path, 1 + else: + yield cdx_path, -1 + + +if __name__ == "__main__": + job = IndexWARCJob() + job.run() diff --git a/requirements_ccpyspark.txt b/requirements_ccpyspark.txt new file mode 100644 index 0000000..9d3d21f --- /dev/null +++ b/requirements_ccpyspark.txt @@ -0,0 +1,3 @@ +boto3 +# PyWB is required to index WARC files, running indexwarcs_cc_pyspark.py +pywb diff --git a/run_index_ccpyspark.sh b/run_index_ccpyspark.sh new file mode 100755 index 0000000..7f2d0a4 --- /dev/null +++ b/run_index_ccpyspark.sh @@ -0,0 +1,187 @@ +#!/bin/bash + +if [ $# -lt 2 ]; then + cat <<"EOF" +$0 [] + +Create a Common Crawl CDX ZipNum index for a monthly crawl. All steps are run on Spark. + + Year and week of the monthly crawl to be indexed, e.g. 2016-44 + used to determine the final location of the index + s3://commoncrawl/cc-index/collections/CC-MAIN-2016-44/... + Also locations for temporary files include the crawl name. + + List of WARC file objects to be indexed, e.g, the WARC/WAT/WET list + s3://commoncrawl/crawl-data/CC-MAIN-2016-44/warc.paths + or any subset or union of multiple WARC listings (incl. robots.txt WARCs). + Paths in the list must be keys/objects in the Common Crawl bucket + or another bucket configured in this script (WARC_PREFIX). + The path to the list must be an absolute URL on HDFS or S3A. + + The "index warcs" step is skipped if an empty string is passed as argument. + Since 2018 the per-WARC CDX files are written directly by the Fetcher + and include index fields combined from the WARC response and metadata record. + The latter holds the detected language and charset. + + Optional split file to be reused from previous crawl with similar distribution of URLs. + If not given, splits are calculated and saved on the default split file path. + +EOF + exit 1 +fi + + +YEARWEEK="$1" +WARC_MANIFEST="$2" +REUSE_SPLIT_FILE="$3" + +CRAWL="CC-MAIN-$YEARWEEK" + +echo "Generating cc-index for $CRAWL" +echo +echo WARC_MANIFEST="$WARC_MANIFEST" +echo + +# Path prefix of WARC/WAT/WET files listed in WARC_MANIFEST +WARC_PREFIX="s3://commoncrawl/" + +# AWS S3 bucket to hold CDX files +WARC_CDX_BUCKET="commoncrawl-index-temp" +WARC_CDX_PREFIX="s3://$WARC_CDX_BUCKET/" + +# Location of the CDX status table +SPARK_SQL_WAREHOUSE="s3a://$WARC_CDX_BUCKET/$CRAWL" +CDX_STATUS_TABLE="cdx_status" + + +# glob pattern to match all CDX files generated in step 1 (indexwarcs_cc_pyspark.py) +# or available otherwise. The URI scheme must be supported by Hadoop / HDFS. +WARC_CDX="s3a://$WARC_CDX_BUCKET/$CRAWL/cdx/segments/*/*/*.cdx.gz" + + +### ZipNum definitions +ZIPNUM_N_LINES=3000 +ZIPNUM_N_PARTITIONS=300 + +# SPLIT_FILE could be reused from previous crawl with similar distribution of URLs, see REUSE_SPLIT_FILE +SPLIT_FILE="s3a://$WARC_CDX_BUCKET/$CRAWL/partition_boundaries.json" +# if explicitely configured +if [ -n "$REUSE_SPLIT_FILE" ]; then + echo "Reusing SPLIT_FILE $REUSE_SPLIT_FILE" + SPLIT_FILE="$REUSE_SPLIT_FILE" +fi + +# temporary output path of part-n files of the zipnum job, concatenated into the cluster.idx +ZIPNUM_TEMP_DIR="s3://$WARC_CDX_BUCKET/$CRAWL/indexes/" + +# final path to ZipNum index files +ZIPNUM_CLUSTER_DIR="s3://commoncrawl/cc-index/collections/$CRAWL/indexes/" + + +# configure S3 buffer directory +# - must exist on task/compute nodes for buffering data +# - should provide several GBs of free space to hold temporarily +# the downloaded data (WARC, WAT, WET files to be indexed), +# only relevant for the indexwarcs_cc_pyspark job. +if [ -n "$S3_LOCAL_TEMP_DIR" ]; then + S3_LOCAL_TEMP_DIR="--local_temp_dir=$S3_LOCAL_TEMP_DIR" +else + S3_LOCAL_TEMP_DIR="" +fi + + + +### PySpark definitions +export PYSPARK_PYTHON="python" # or "python3" + +# Python dependencies (for simplicity, include all Python files: cc-pyspark/*.py) +PYFILES=sparkcc.py + +### Spark configuration + +SPARK_ON_YARN="--master yarn" +SPARK_HADOOP_OPTS="" +SPARK_EXTRA_OPTS="" + +# defines SPARK_HOME, SPARK_HADOOP_OPTS and HADOOP_CONF_DIR +. spark_env.sh + +NUM_EXECUTORS=${NUM_EXECUTORS:-1} +EXECUTOR_CORES=${EXECUTOR_CORES:-2} +# input partitions for the WARC-to-CDX stop +NUM_WARC_INPUT_PARTITIONS=${NUM_WARC_INPUT_PARTITIONS:-10} + +export LC_ALL=C + +set -e +set -x + + +if [ -n "$WARC_MANIFEST" ]; then + # Index WARC files in the manifest, write one CDX file per WARC + EXECUTOR_MEM=${EXECUTOR_MEM:-2g} + if [[ $NUM_WARC_INPUT_PARTITIONS -lt $((NUM_EXECUTORS*EXECUTOR_CORES)) ]]; then + echo "The number of input partitions is too low to utilize all executor cores" + exit 1 + fi + $SPARK_HOME/bin/spark-submit \ + $SPARK_ON_YARN \ + $SPARK_HADOOP_OPTS \ + --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ + --conf spark.task.maxFailures=5 \ + --conf spark.executor.memory=$EXECUTOR_MEM \ + --conf spark.driver.memory=3g \ + --conf spark.core.connection.ack.wait.timeout=600s \ + --conf spark.network.timeout=300s \ + --conf spark.shuffle.io.maxRetries=50 \ + --conf spark.shuffle.io.retryWait=600s \ + --conf spark.locality.wait=1s \ + --conf spark.executorEnv.LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native \ + --num-executors $NUM_EXECUTORS \ + --executor-cores $EXECUTOR_CORES \ + --executor-memory $EXECUTOR_MEM \ + --conf spark.sql.warehouse.dir="$SPARK_SQL_WAREHOUSE" \ + --py-files $PYFILES \ + indexwarcs_cc_pyspark.py \ + --input_base_url="$WARC_PREFIX" \ + --output_base_url="$WARC_CDX_PREFIX" \ + $S3_LOCAL_TEMP_DIR \ + --num_input_partitions=$NUM_WARC_INPUT_PARTITIONS \ + --num_output_partitions=1 \ + "$WARC_MANIFEST" "$CDX_STATUS_TABLE" +fi + + +### Create ZipNum index +EXECUTOR_MEM=${EXECUTOR_MEM:-3g} + +$SPARK_HOME/bin/spark-submit \ + $SPARK_ON_YARN \ + $SPARK_HADOOP_OPTS \ + --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ + --conf spark.task.maxFailures=5 \ + --conf spark.executor.memory=$EXECUTOR_MEM \ + --conf spark.driver.memory=3g \ + --conf spark.core.connection.ack.wait.timeout=600s \ + --conf spark.network.timeout=300s \ + --conf spark.shuffle.io.maxRetries=50 \ + --conf spark.shuffle.io.retryWait=600s \ + --conf spark.locality.wait=1s \ + --conf spark.io.compression.codec=zstd \ + --conf spark.checkpoint.compress=true \ + --conf spark.executorEnv.LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native \ + --num-executors $NUM_EXECUTORS \ + --executor-cores $EXECUTOR_CORES \ + --executor-memory $EXECUTOR_MEM \ + --py-files $PYFILES \ + zipnumcluster_cc_pyspark.py \ + $S3_LOCAL_TEMP_DIR \ + --input_base_url="" \ + --output_base_url="$ZIPNUM_CLUSTER_DIR" \ + --temporary_output_base_url="$ZIPNUM_TEMP_DIR" \ + --partition_boundaries_file="$SPLIT_FILE" \ + --num_lines=$ZIPNUM_N_LINES \ + --num_output_partitions=$ZIPNUM_N_PARTITIONS \ + "$WARC_CDX" "" + + diff --git a/zipnumcluster_cc_pyspark.py b/zipnumcluster_cc_pyspark.py new file mode 100644 index 0000000..4508652 --- /dev/null +++ b/zipnumcluster_cc_pyspark.py @@ -0,0 +1,303 @@ +import argparse +import json +import logging +import os +import re +import zlib + +from typing import Iterator, Tuple + +import boto3 +import botocore + + +from sparkcc import CCFileProcessorSparkJob + + +class ZipNumClusterCdx(CCFileProcessorSparkJob): + """Spark job to create a ZipNum Sharded CDX index, see + . + The index is sharded over multiple partitions (default = 300). Each partition file + is compressed using gzip, but in chunks of 3000 lines (a configurable number). + Every chunk can be read separately, a jump index allows to find the right chunk + for a given key in a binary search. + """ + + name = 'ZipNumClusterCdx' + + input_descr = """Glob pattern of input CDX files, e.g., file:///path/*/*.cdx.gz +(HDFS-compatible filesystems only: hdfs://, s3a://, file://).""" + output_descr = "Ignored but required (can be empty, no output table is produced)." + + DATA_URL_PATTERN = re.compile('^(s3|https?|file|hdfs|s3a|s3n):(?://([^/]*))?/(.*)') + + + def add_arguments(self, parser): + super(CCFileProcessorSparkJob,self).add_arguments(parser) + parser.add_argument("--output_base_url", required=True, + help="Output destination.") + parser.add_argument("--partition_boundaries_file", required=True, + help="Full path to a JSON file containing partition boundaries. " + "If specified, and does not exist, will be created, otherwise, " + "it will be used.") + parser.add_argument("--temporary_output_base_url", required=True, + help="Temporary output location for per-shard cluster indexes.") + parser.add_argument("--num_lines", type=int, required=False, + default=3000, + help="Number of lines to compress in each chunk") + parser.add_argument("--num_output_partitions", type=int, required=False, + default=300, + help="Number of partitions/shards") + # suppress help for ignored arguments + parser.add_argument("--output_format", help=argparse.SUPPRESS) + parser.add_argument("--output_compression", help=argparse.SUPPRESS) + parser.add_argument("--output_option", help=argparse.SUPPRESS) + + @staticmethod + def parse_line(line): + try: + parts = line.split(' ', 2) + if len(parts) != 3: + return None + surt_key, timestamp, json_str = parts + return ((surt_key, timestamp), json_str) + except: + return None + + @staticmethod + def get_partition_id(key: str, boundaries_data) -> int: + """Determine partition based on range boundaries""" + if not boundaries_data: + return 0 + + # Binary search to find the right partition + left = 0 + right = len(boundaries_data) + + while left < right: + mid = (left + right) // 2 + if mid == len(boundaries_data): + return mid + if key <= boundaries_data[mid]: + right = mid + else: + left = mid + 1 + + return left + + @staticmethod + def write_output_file(uri, fd, base_uri=None): + """ + Write data from stream fd to output file location defined per URI. + A static variant of CCFileProcessorSparkJob.write_output_file(...) + """ + uri_match = ZipNumClusterCdx.DATA_URL_PATTERN.match(uri) + if not uri_match and base_uri: + # relative input URI (path) and base URI defined + uri = base_uri + uri + uri_match = ZipNumClusterCdx.DATA_URL_PATTERN.match(uri) + if uri_match: + (scheme, netloc, path) = uri_match.groups() + else: + # keep local file paths as is + path = uri + scheme = 'file' + netloc = None + + if scheme in {'s3', 's3a', 's3n'}: + bucketname = netloc + output_path = path + try: + client = boto3.client('s3') + client.upload_fileobj(fd, bucketname, path) + except botocore.client.ClientError as exception: + logging.error( + 'Failed to write to S3 {}: {}'.format(output_path, exception)) + + elif scheme in {'http', 'https'}: + raise ValueError('HTTP/HTTPS output not supported') + + elif scheme == 'hdfs': + raise NotImplementedError('HDFS output not implemented') + + else: + logging.info('Writing local file {}'.format(uri)) + if scheme == 'file': + # must be an absolute path + uri = os.path.join('/', path) + else: + base_dir = os.path.abspath(os.path.dirname(__file__)) + uri = os.path.join(base_dir, uri) + os.makedirs(os.path.dirname(uri), exist_ok=True) + with open(uri, 'wb') as f: + f.write(fd.read()) + + @staticmethod + def write_partition_with_global_seq(idx: int, partition_iter: list, + records_per_partition: int, output_base_url: str): + partition_idx_file = f"idx-{idx:05d}.idx" + + # Calculate starting sequence number for this partition + start_seq = (idx * records_per_partition) + 1 if records_per_partition else 1 + + with open(partition_idx_file, 'w', encoding="utf-8") as f: + seq = start_seq + for record in partition_iter: + min_surt, _, min_surt_timestamp, filename, _, offset, length, _ = record + f.write(f"{min_surt} {min_surt_timestamp}\t{filename}\t{offset}\t{length}\t{seq}\n") + seq += 1 + + with open(partition_idx_file, 'rb') as fd: + ZipNumClusterCdx.write_output_file(partition_idx_file, fd, output_base_url) + + os.unlink(partition_idx_file) + + return [(partition_idx_file, True)] + + @staticmethod + def process_partition(partition_id: int, partition_iter: Iterator[Tuple[str, Tuple[str, str]]], + num_lines: int, output_base_url: str, temporary_output_base_url: str) \ + -> Iterator[Tuple[str, str, str, str, int, int, int, int]]: + """Process partition with chunked compression and chunk boundary tracking""" + output_filename = f"cdx-{partition_id:05d}.gz" + index_entries = [] + current_offset = 0 + chunk_size = num_lines + + current_chunk = [] + chunk_min_surt = None + chunk_max_surt = None + chunk_min_timestamp = None + + with open(output_filename, 'wb') as f: + for (surt_key, timestamp), json_data in partition_iter: + line = f"{surt_key} {timestamp} {json_data}\n" + if chunk_min_surt is None: + chunk_min_surt = surt_key + chunk_min_timestamp = timestamp + chunk_max_surt = surt_key # Will end up as max since data is sorted + current_chunk.append(line) + + if len(current_chunk) >= chunk_size: + # Compress and write chunk + chunk_data = ''.join(current_chunk).encode('utf-8') + z = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16) + compressed = z.compress(chunk_data) + z.flush() + chunk_length = len(compressed) + f.write(compressed) + + # Index entry with chunk boundaries + index_entries.append(( + str(chunk_min_surt), # min surt + str(chunk_max_surt), # max surt + str(chunk_min_timestamp), # capture time + str(output_filename), # filename + int(partition_id), # explicit integer conversion + int(current_offset), # explicit integer conversion + int(chunk_length), # explicit integer conversion + int(len(current_chunk)) # number of records in chunk + )) + + current_offset += chunk_length + current_chunk = [] + chunk_min_surt = None + + # Handle final chunk + if current_chunk: + chunk_data = ''.join(current_chunk).encode('utf-8') + z = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16) + compressed = z.compress(chunk_data) + z.flush() + chunk_length = len(compressed) + f.write(compressed) + + index_entries.append(( + str(chunk_min_surt), # min surt + str(chunk_max_surt), # max surt + str(chunk_min_timestamp), # capture time + str(output_filename), + int(partition_id), + int(current_offset), + int(chunk_length), + int(len(current_chunk)) + )) + current_chunk = [] + + with open(output_filename, 'rb') as fd: + ZipNumClusterCdx.write_output_file(output_filename, fd, output_base_url) + + os.unlink(output_filename) + + final_files = ZipNumClusterCdx.write_partition_with_global_seq( + partition_id, index_entries, num_lines, temporary_output_base_url) + + return final_files + + def run_job(self, session): + input_url = self.args.input_base_url + self.args.input + num_partitions = self.args.num_output_partitions + boundaries_file_uri = self.args.partition_boundaries_file + num_lines = self.args.num_lines + output_base_url = self.args.output_base_url + temporary_output_base_url = self.args.temporary_output_base_url + + rdd = session.sparkContext.textFile(input_url).map( + self.parse_line).filter(lambda x: x is not None) + + boundaries = None + self.get_logger(session).info(f"Boundaries file: {boundaries_file_uri}") + if boundaries_file_uri and self.check_for_output_file(boundaries_file_uri): + self.get_logger(session).info(f"Boundaries file found, using it: {boundaries_file_uri}") + with self.fetch_file(boundaries_file_uri) as f: + boundaries = list(map(lambda l: tuple(l), json.load(f))) + + else: + # The percentage needs to be pretty small, since the collect + # brings data back to the driver... + # 1/2 percent should be fine + samples = rdd.keys().sample(False, 0.005).collect() + samples.sort() + + # Ensure more even distribution by using quantiles + total_samples = len(samples) + boundaries = [] + for i in range(1, num_partitions): + idx = (i * total_samples) // num_partitions + if idx < len(samples): + boundaries.append(samples[idx]) + + temp_file_name = 'temp_range_boundaries.json' + with open(temp_file_name, 'w', encoding="utf-8") as f: + json.dump(boundaries, f) + + with open(temp_file_name, 'rb') as f: + self.write_output_file(boundaries_file_uri, f) + + os.unlink(temp_file_name) + + self.get_logger(session).info( + f"Boundaries file created: {boundaries_file_uri}") + + rdd = rdd.repartitionAndSortWithinPartitions( + numPartitions=num_partitions, + partitionFunc=lambda k: ZipNumClusterCdx.get_partition_id(k, boundaries)) \ + .mapPartitionsWithIndex( + lambda idx, iter: ZipNumClusterCdx.process_partition( + idx, iter, num_lines, output_base_url, temporary_output_base_url)) \ + .collect() + + # loop over the output files and concatenate them into a single final file + with open('cluster.idx', 'wb') as f: + for idx_file, _ in rdd: + with self.fetch_file(temporary_output_base_url + idx_file) as idx_fd: + for line in idx_fd: + f.write(line) + + with open('cluster.idx', 'rb') as f: + self.write_output_file('cluster.idx', f, output_base_url) + + os.unlink('cluster.idx') + + +if __name__ == "__main__": + job = ZipNumClusterCdx() + job.run()