Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
48a3672
Add an alternate spark version of indexwarcsjob (without mrjob)
jt55401 Sep 10, 2024
406b63f
fix so we properly can skip fully failed files and log them.
jt55401 Nov 8, 2024
65bf452
feat: draft version of zipnum cluster job as spark (untested)
jt55401 Nov 8, 2024
fcb5695
docs: adding note about zipnum testing and behaviour
jt55401 Nov 8, 2024
e6bb313
feat: reservoir sampled method of zipnum cluster job
jt55401 Nov 20, 2024
4a99403
fix: simpler ordered partition id
jt55401 Nov 20, 2024
e8dcf78
fix: re-init and flush gz
jt55401 Nov 20, 2024
2a1de31
fix: bugfix for edge conditions
jt55401 Nov 20, 2024
951ede4
fix: getting order and structure of final index proper, and in single…
jt55401 Nov 20, 2024
d3f5ee9
fix: fix cdx filenames
jt55401 Nov 20, 2024
3f876a8
fix: final cluster.idx logic
jt55401 Nov 20, 2024
688e879
fix: get zipnumcluster-pyspark working with s3/emr
jt55401 Nov 27, 2024
7299693
chore: merged latest normal spark solution upwards
jt55401 Nov 27, 2024
1ce6690
chore: merging other changes
jt55401 Nov 27, 2024
9c0f47b
fix: numerous bugfixes from complex merge issues
jt55401 Nov 27, 2024
1d9a857
fix: more emr bugfixes and tweaks
jt55401 Nov 27, 2024
55f907c
fix: temporary working version of zipnum job using normal spark feat…
jt55401 Dec 2, 2024
c0c454b
chore: going back to using CCFileProcessorSparkJob
jt55401 Dec 3, 2024
0f0b6d4
fix: bugfixes found from integration testing
jt55401 Dec 16, 2024
30b3ce7
fix: spelling of boundaries
jt55401 Dec 20, 2024
91a18e7
fix: marking args as required
jt55401 Dec 20, 2024
fcd02cf
fix: remove keyfunc, it isn't needed (and was causing issue that sort…
jt55401 Dec 20, 2024
3f60a21
chore: adding TODO for upcoming tasks
jt55401 Jan 27, 2025
a1f8553
fix: code formatting and comments
sebastian-nagel Nov 13, 2025
014d702
fix: sort by SURT key *and* capture timestamp
sebastian-nagel Nov 14, 2025
0ce27c8
refactor: rename Python files to valid module names
sebastian-nagel Nov 14, 2025
7779a47
refactor: move functions out of global scope
sebastian-nagel Nov 17, 2025
64a2e11
fix: do not create output directory tree in local filesystem
sebastian-nagel Nov 17, 2025
136f1ae
fix: type error when using partition boundary file
sebastian-nagel Nov 17, 2025
7b0f5cc
refactor: code documentation, formatting and cleanup
sebastian-nagel Nov 17, 2025
8394c74
fix: use correct timestamp in cluster.idx
sebastian-nagel Nov 17, 2025
02be757
refactor: persistence and temporary files
sebastian-nagel Nov 17, 2025
beac6c8
fix: unify logging, address Pylint warnings
sebastian-nagel Nov 18, 2025
a9ff183
fix: suppress ignored CLI options
sebastian-nagel Nov 18, 2025
af10f62
Spark jobs to build index: README, add shell script and requirements
sebastian-nagel Nov 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ WebArchive URL Indexing

This project contains several scripts (MapReduce jobs) for generating URL indexes of web archive collections, usually containing large number of WARC (or ARC) files. The scripts are designed to ran on Hadoop or Amazon EMR to process terabytes or even petabytes of web archive content. Additionally, thanks to flexibility of the MRJob library, the scripts can also run on a local machine to build an index cluster.

The indexer was originally implemented based on [mrjob](//github.com/Yelp/mrjob). Because mrjob is no longer maintained, it was ported to PySpark (based on [cc-pyspark](//github.com/commoncrawl/cc-pyspark/)) in 2024/2025. For running the Spark jobs, see [run_index_ccpyspark.sh](run_index_ccpyspark.sh) and [requirements_ccpyspark.txt](requirements_ccpyspark.txt). See also the documentation of [cc-pyspark](//github.com/commoncrawl/cc-pyspark/).

The description below documents how to run the MapReduce jobs.

## Initial Setup and Usage

Python 3 is required - see the branch `python-2.7` for a previous version running on Python 2.7 (not maintained anymore).
Expand Down
86 changes: 86 additions & 0 deletions indexwarcs_cc_pyspark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
from gzip import GzipFile
from tempfile import TemporaryFile

from pywb.indexer.cdxindexer import write_cdx_index

from sparkcc import CCFileProcessorSparkJob



class IndexWARCJob(CCFileProcessorSparkJob):
""" This job receives as input a manifest of WARC/ARC files and produces
a CDX index per file

The pywb.indexer.cdxindexer is used to create the index, with a fixed set of options
"""

name = 'IndexWARCJob'

# description of input and output shown by --help
input_descr = "Path to file listing input paths (WARC/WAT/WET/ARC)"
output_descr = """Table containing the output CDX files
(in spark.sql.warehouse.dir) and the indexing status:
1 successfully created,
0 already exists,
-1 processing failed"""

# PyWB index options
index_options = {
'surt_ordered': True,
'sort': True,
'cdxj': True,
#'minimal': True
}

def add_arguments(self, parser):
super(CCFileProcessorSparkJob, self).add_arguments(parser)
parser.add_argument("--output_base_url", required=True,
help="Destination for CDX output.")
parser.add_argument("--skip_existing", action='store_true',
help="Skip processing files for which "
"the output CDX file already exists.")

def _conv_warc_to_cdx_path(self, warc_path):
cdx_path = warc_path.replace('crawl-data', 'cc-index/cdx')
cdx_path = cdx_path.replace('.warc.gz', '.cdx.gz')
cdx_path = cdx_path.replace('.warc.wet.gz', '.wet.cdx.gz')
cdx_path = cdx_path.replace('.warc.wat.gz', '.wat.cdx.gz')
return cdx_path

def process_file(self, warc_path, tempfd):

cdx_path = self._conv_warc_to_cdx_path(warc_path)

self.get_logger().info('Indexing WARC: %s', warc_path)

if self.args.skip_existing and \
self.check_for_output_file(cdx_path,self.args.output_base_url):
self.get_logger().info('Already Exists: %s', cdx_path)
yield cdx_path, 0
return

with TemporaryFile(mode='w+b',
dir=self.args.local_temp_dir) as cdxtemp:

success = False
with GzipFile(fileobj=cdxtemp, mode='w+b') as cdxfile:
try:
write_cdx_index(cdxfile, tempfd, warc_path, **self.index_options)
success = True
except Exception as exc:
self.get_logger().error('Failed to index %s: %s', warc_path, exc)

cdxtemp.flush()
cdxtemp.seek(0)

if success:
self.write_output_file(cdx_path, cdxtemp, self.args.output_base_url)
self.get_logger().info('Successfully uploaded CDX: %s', cdx_path)
yield cdx_path, 1
else:
yield cdx_path, -1


if __name__ == "__main__":
job = IndexWARCJob()
job.run()
3 changes: 3 additions & 0 deletions requirements_ccpyspark.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
boto3
# PyWB is required to index WARC files, running indexwarcs_cc_pyspark.py
pywb
187 changes: 187 additions & 0 deletions run_index_ccpyspark.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
#!/bin/bash

if [ $# -lt 2 ]; then
cat <<"EOF"
$0 <year-week-of-crawl> <path-to-warc-file-list> [<split_file>]

Create a Common Crawl CDX ZipNum index for a monthly crawl. All steps are run on Spark.

<year-week-of-crawl> Year and week of the monthly crawl to be indexed, e.g. 2016-44
used to determine the final location of the index
s3://commoncrawl/cc-index/collections/CC-MAIN-2016-44/...
Also locations for temporary files include the crawl name.

<path-to-warc-file-list> List of WARC file objects to be indexed, e.g, the WARC/WAT/WET list
s3://commoncrawl/crawl-data/CC-MAIN-2016-44/warc.paths
or any subset or union of multiple WARC listings (incl. robots.txt WARCs).
Paths in the list must be keys/objects in the Common Crawl bucket
or another bucket configured in this script (WARC_PREFIX).
The path to the list must be an absolute URL on HDFS or S3A.

The "index warcs" step is skipped if an empty string is passed as argument.
Since 2018 the per-WARC CDX files are written directly by the Fetcher
and include index fields combined from the WARC response and metadata record.
The latter holds the detected language and charset.

<split_file> Optional split file to be reused from previous crawl with similar distribution of URLs.
If not given, splits are calculated and saved on the default split file path.

EOF
exit 1
fi


YEARWEEK="$1"
WARC_MANIFEST="$2"
REUSE_SPLIT_FILE="$3"

CRAWL="CC-MAIN-$YEARWEEK"

echo "Generating cc-index for $CRAWL"
echo
echo WARC_MANIFEST="$WARC_MANIFEST"
echo

# Path prefix of WARC/WAT/WET files listed in WARC_MANIFEST
WARC_PREFIX="s3://commoncrawl/"

# AWS S3 bucket to hold CDX files
WARC_CDX_BUCKET="commoncrawl-index-temp"
WARC_CDX_PREFIX="s3://$WARC_CDX_BUCKET/"

# Location of the CDX status table
SPARK_SQL_WAREHOUSE="s3a://$WARC_CDX_BUCKET/$CRAWL"
CDX_STATUS_TABLE="cdx_status"


# glob pattern to match all CDX files generated in step 1 (indexwarcs_cc_pyspark.py)
# or available otherwise. The URI scheme must be supported by Hadoop / HDFS.
WARC_CDX="s3a://$WARC_CDX_BUCKET/$CRAWL/cdx/segments/*/*/*.cdx.gz"


### ZipNum definitions
ZIPNUM_N_LINES=3000
ZIPNUM_N_PARTITIONS=300

# SPLIT_FILE could be reused from previous crawl with similar distribution of URLs, see REUSE_SPLIT_FILE
SPLIT_FILE="s3a://$WARC_CDX_BUCKET/$CRAWL/partition_boundaries.json"
# if explicitely configured
if [ -n "$REUSE_SPLIT_FILE" ]; then
echo "Reusing SPLIT_FILE $REUSE_SPLIT_FILE"
SPLIT_FILE="$REUSE_SPLIT_FILE"
fi

# temporary output path of part-n files of the zipnum job, concatenated into the cluster.idx
ZIPNUM_TEMP_DIR="s3://$WARC_CDX_BUCKET/$CRAWL/indexes/"

# final path to ZipNum index files
ZIPNUM_CLUSTER_DIR="s3://commoncrawl/cc-index/collections/$CRAWL/indexes/"


# configure S3 buffer directory
# - must exist on task/compute nodes for buffering data
# - should provide several GBs of free space to hold temporarily
# the downloaded data (WARC, WAT, WET files to be indexed),
# only relevant for the indexwarcs_cc_pyspark job.
if [ -n "$S3_LOCAL_TEMP_DIR" ]; then
S3_LOCAL_TEMP_DIR="--local_temp_dir=$S3_LOCAL_TEMP_DIR"
else
S3_LOCAL_TEMP_DIR=""
fi



### PySpark definitions
export PYSPARK_PYTHON="python" # or "python3"

# Python dependencies (for simplicity, include all Python files: cc-pyspark/*.py)
PYFILES=sparkcc.py

### Spark configuration

SPARK_ON_YARN="--master yarn"
SPARK_HADOOP_OPTS=""
SPARK_EXTRA_OPTS=""

# defines SPARK_HOME, SPARK_HADOOP_OPTS and HADOOP_CONF_DIR
. spark_env.sh

NUM_EXECUTORS=${NUM_EXECUTORS:-1}
EXECUTOR_CORES=${EXECUTOR_CORES:-2}
# input partitions for the WARC-to-CDX stop
NUM_WARC_INPUT_PARTITIONS=${NUM_WARC_INPUT_PARTITIONS:-10}

export LC_ALL=C

set -e
set -x


if [ -n "$WARC_MANIFEST" ]; then
# Index WARC files in the manifest, write one CDX file per WARC
EXECUTOR_MEM=${EXECUTOR_MEM:-2g}
if [[ $NUM_WARC_INPUT_PARTITIONS -lt $((NUM_EXECUTORS*EXECUTOR_CORES)) ]]; then
echo "The number of input partitions is too low to utilize all executor cores"
exit 1
fi
$SPARK_HOME/bin/spark-submit \
$SPARK_ON_YARN \
$SPARK_HADOOP_OPTS \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.task.maxFailures=5 \
--conf spark.executor.memory=$EXECUTOR_MEM \
--conf spark.driver.memory=3g \
--conf spark.core.connection.ack.wait.timeout=600s \
--conf spark.network.timeout=300s \
--conf spark.shuffle.io.maxRetries=50 \
--conf spark.shuffle.io.retryWait=600s \
--conf spark.locality.wait=1s \
--conf spark.executorEnv.LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native \
--num-executors $NUM_EXECUTORS \
--executor-cores $EXECUTOR_CORES \
--executor-memory $EXECUTOR_MEM \
--conf spark.sql.warehouse.dir="$SPARK_SQL_WAREHOUSE" \
--py-files $PYFILES \
indexwarcs_cc_pyspark.py \
--input_base_url="$WARC_PREFIX" \
--output_base_url="$WARC_CDX_PREFIX" \
$S3_LOCAL_TEMP_DIR \
--num_input_partitions=$NUM_WARC_INPUT_PARTITIONS \
--num_output_partitions=1 \
"$WARC_MANIFEST" "$CDX_STATUS_TABLE"
fi


### Create ZipNum index
EXECUTOR_MEM=${EXECUTOR_MEM:-3g}

$SPARK_HOME/bin/spark-submit \
$SPARK_ON_YARN \
$SPARK_HADOOP_OPTS \
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
--conf spark.task.maxFailures=5 \
--conf spark.executor.memory=$EXECUTOR_MEM \
--conf spark.driver.memory=3g \
--conf spark.core.connection.ack.wait.timeout=600s \
--conf spark.network.timeout=300s \
--conf spark.shuffle.io.maxRetries=50 \
--conf spark.shuffle.io.retryWait=600s \
--conf spark.locality.wait=1s \
--conf spark.io.compression.codec=zstd \
--conf spark.checkpoint.compress=true \
--conf spark.executorEnv.LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native \
--num-executors $NUM_EXECUTORS \
--executor-cores $EXECUTOR_CORES \
--executor-memory $EXECUTOR_MEM \
--py-files $PYFILES \
zipnumcluster_cc_pyspark.py \
$S3_LOCAL_TEMP_DIR \
--input_base_url="" \
--output_base_url="$ZIPNUM_CLUSTER_DIR" \
--temporary_output_base_url="$ZIPNUM_TEMP_DIR" \
--partition_boundaries_file="$SPLIT_FILE" \
--num_lines=$ZIPNUM_N_LINES \
--num_output_partitions=$ZIPNUM_N_PARTITIONS \
"$WARC_CDX" ""


Loading