Skip to content

Commit af10f62

Browse files
Spark jobs to build index: README, add shell script and requirements
1 parent a9ff183 commit af10f62

File tree

3 files changed

+194
-0
lines changed

3 files changed

+194
-0
lines changed

README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ WebArchive URL Indexing
33

44
This project contains several scripts (MapReduce jobs) for generating URL indexes of web archive collections, usually containing large number of WARC (or ARC) files. The scripts are designed to ran on Hadoop or Amazon EMR to process terabytes or even petabytes of web archive content. Additionally, thanks to flexibility of the MRJob library, the scripts can also run on a local machine to build an index cluster.
55

6+
The indexer was originally implemented based on [mrjob](//github.com/Yelp/mrjob). Because mrjob is no longer maintained, it was ported to PySpark (based on [cc-pyspark](//github.com/commoncrawl/cc-pyspark/)) in 2024/2025. For running the Spark jobs, see [run_index_ccpyspark.sh](run_index_ccpyspark.sh) and [requirements_ccpyspark.txt](requirements_ccpyspark.txt). See also the documentation of [cc-pyspark](//github.com/commoncrawl/cc-pyspark/).
7+
8+
The description below documents how to run the MapReduce jobs.
9+
610
## Initial Setup and Usage
711

812
Python 3 is required - see the branch `python-2.7` for a previous version running on Python 2.7 (not maintained anymore).

requirements_ccpyspark.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
boto3
2+
# PyWB is required to index WARC files, running indexwarcs_cc_pyspark.py
3+
pywb

run_index_ccpyspark.sh

Lines changed: 187 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,187 @@
1+
#!/bin/bash
2+
3+
if [ $# -lt 2 ]; then
4+
cat <<"EOF"
5+
$0 <year-week-of-crawl> <path-to-warc-file-list> [<split_file>]
6+
7+
Create a Common Crawl CDX ZipNum index for a monthly crawl. All steps are run on Spark.
8+
9+
<year-week-of-crawl> Year and week of the monthly crawl to be indexed, e.g. 2016-44
10+
used to determine the final location of the index
11+
s3://commoncrawl/cc-index/collections/CC-MAIN-2016-44/...
12+
Also locations for temporary files include the crawl name.
13+
14+
<path-to-warc-file-list> List of WARC file objects to be indexed, e.g, the WARC/WAT/WET list
15+
s3://commoncrawl/crawl-data/CC-MAIN-2016-44/warc.paths
16+
or any subset or union of multiple WARC listings (incl. robots.txt WARCs).
17+
Paths in the list must be keys/objects in the Common Crawl bucket
18+
or another bucket configured in this script (WARC_PREFIX).
19+
The path to the list must be an absolute URL on HDFS or S3A.
20+
21+
The "index warcs" step is skipped if an empty string is passed as argument.
22+
Since 2018 the per-WARC CDX files are written directly by the Fetcher
23+
and include index fields combined from the WARC response and metadata record.
24+
The latter holds the detected language and charset.
25+
26+
<split_file> Optional split file to be reused from previous crawl with similar distribution of URLs.
27+
If not given, splits are calculated and saved on the default split file path.
28+
29+
EOF
30+
exit 1
31+
fi
32+
33+
34+
YEARWEEK="$1"
35+
WARC_MANIFEST="$2"
36+
REUSE_SPLIT_FILE="$3"
37+
38+
CRAWL="CC-MAIN-$YEARWEEK"
39+
40+
echo "Generating cc-index for $CRAWL"
41+
echo
42+
echo WARC_MANIFEST="$WARC_MANIFEST"
43+
echo
44+
45+
# Path prefix of WARC/WAT/WET files listed in WARC_MANIFEST
46+
WARC_PREFIX="s3://commoncrawl/"
47+
48+
# AWS S3 bucket to hold CDX files
49+
WARC_CDX_BUCKET="commoncrawl-index-temp"
50+
WARC_CDX_PREFIX="s3://$WARC_CDX_BUCKET/"
51+
52+
# Location of the CDX status table
53+
SPARK_SQL_WAREHOUSE="s3a://$WARC_CDX_BUCKET/$CRAWL"
54+
CDX_STATUS_TABLE="cdx_status"
55+
56+
57+
# glob pattern to match all CDX files generated in step 1 (indexwarcs_cc_pyspark.py)
58+
# or available otherwise. The URI scheme must be supported by Hadoop / HDFS.
59+
WARC_CDX="s3a://$WARC_CDX_BUCKET/$CRAWL/cdx/segments/*/*/*.cdx.gz"
60+
61+
62+
### ZipNum definitions
63+
ZIPNUM_N_LINES=3000
64+
ZIPNUM_N_PARTITIONS=300
65+
66+
# SPLIT_FILE could be reused from previous crawl with similar distribution of URLs, see REUSE_SPLIT_FILE
67+
SPLIT_FILE="s3a://$WARC_CDX_BUCKET/$CRAWL/partition_boundaries.json"
68+
# if explicitely configured
69+
if [ -n "$REUSE_SPLIT_FILE" ]; then
70+
echo "Reusing SPLIT_FILE $REUSE_SPLIT_FILE"
71+
SPLIT_FILE="$REUSE_SPLIT_FILE"
72+
fi
73+
74+
# temporary output path of part-n files of the zipnum job, concatenated into the cluster.idx
75+
ZIPNUM_TEMP_DIR="s3://$WARC_CDX_BUCKET/$CRAWL/indexes/"
76+
77+
# final path to ZipNum index files
78+
ZIPNUM_CLUSTER_DIR="s3://commoncrawl/cc-index/collections/$CRAWL/indexes/"
79+
80+
81+
# configure S3 buffer directory
82+
# - must exist on task/compute nodes for buffering data
83+
# - should provide several GBs of free space to hold temporarily
84+
# the downloaded data (WARC, WAT, WET files to be indexed),
85+
# only relevant for the indexwarcs_cc_pyspark job.
86+
if [ -n "$S3_LOCAL_TEMP_DIR" ]; then
87+
S3_LOCAL_TEMP_DIR="--local_temp_dir=$S3_LOCAL_TEMP_DIR"
88+
else
89+
S3_LOCAL_TEMP_DIR=""
90+
fi
91+
92+
93+
94+
### PySpark definitions
95+
export PYSPARK_PYTHON="python" # or "python3"
96+
97+
# Python dependencies (for simplicity, include all Python files: cc-pyspark/*.py)
98+
PYFILES=sparkcc.py
99+
100+
### Spark configuration
101+
102+
SPARK_ON_YARN="--master yarn"
103+
SPARK_HADOOP_OPTS=""
104+
SPARK_EXTRA_OPTS=""
105+
106+
# defines SPARK_HOME, SPARK_HADOOP_OPTS and HADOOP_CONF_DIR
107+
. spark_env.sh
108+
109+
NUM_EXECUTORS=${NUM_EXECUTORS:-1}
110+
EXECUTOR_CORES=${EXECUTOR_CORES:-2}
111+
# input partitions for the WARC-to-CDX stop
112+
NUM_WARC_INPUT_PARTITIONS=${NUM_WARC_INPUT_PARTITIONS:-10}
113+
114+
export LC_ALL=C
115+
116+
set -e
117+
set -x
118+
119+
120+
if [ -n "$WARC_MANIFEST" ]; then
121+
# Index WARC files in the manifest, write one CDX file per WARC
122+
EXECUTOR_MEM=${EXECUTOR_MEM:-2g}
123+
if [[ $NUM_WARC_INPUT_PARTITIONS -lt $((NUM_EXECUTORS*EXECUTOR_CORES)) ]]; then
124+
echo "The number of input partitions is too low to utilize all executor cores"
125+
exit 1
126+
fi
127+
$SPARK_HOME/bin/spark-submit \
128+
$SPARK_ON_YARN \
129+
$SPARK_HADOOP_OPTS \
130+
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
131+
--conf spark.task.maxFailures=5 \
132+
--conf spark.executor.memory=$EXECUTOR_MEM \
133+
--conf spark.driver.memory=3g \
134+
--conf spark.core.connection.ack.wait.timeout=600s \
135+
--conf spark.network.timeout=300s \
136+
--conf spark.shuffle.io.maxRetries=50 \
137+
--conf spark.shuffle.io.retryWait=600s \
138+
--conf spark.locality.wait=1s \
139+
--conf spark.executorEnv.LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native \
140+
--num-executors $NUM_EXECUTORS \
141+
--executor-cores $EXECUTOR_CORES \
142+
--executor-memory $EXECUTOR_MEM \
143+
--conf spark.sql.warehouse.dir="$SPARK_SQL_WAREHOUSE" \
144+
--py-files $PYFILES \
145+
indexwarcs_cc_pyspark.py \
146+
--input_base_url="$WARC_PREFIX" \
147+
--output_base_url="$WARC_CDX_PREFIX" \
148+
$S3_LOCAL_TEMP_DIR \
149+
--num_input_partitions=$NUM_WARC_INPUT_PARTITIONS \
150+
--num_output_partitions=1 \
151+
"$WARC_MANIFEST" "$CDX_STATUS_TABLE"
152+
fi
153+
154+
155+
### Create ZipNum index
156+
EXECUTOR_MEM=${EXECUTOR_MEM:-3g}
157+
158+
$SPARK_HOME/bin/spark-submit \
159+
$SPARK_ON_YARN \
160+
$SPARK_HADOOP_OPTS \
161+
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
162+
--conf spark.task.maxFailures=5 \
163+
--conf spark.executor.memory=$EXECUTOR_MEM \
164+
--conf spark.driver.memory=3g \
165+
--conf spark.core.connection.ack.wait.timeout=600s \
166+
--conf spark.network.timeout=300s \
167+
--conf spark.shuffle.io.maxRetries=50 \
168+
--conf spark.shuffle.io.retryWait=600s \
169+
--conf spark.locality.wait=1s \
170+
--conf spark.io.compression.codec=zstd \
171+
--conf spark.checkpoint.compress=true \
172+
--conf spark.executorEnv.LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native \
173+
--num-executors $NUM_EXECUTORS \
174+
--executor-cores $EXECUTOR_CORES \
175+
--executor-memory $EXECUTOR_MEM \
176+
--py-files $PYFILES \
177+
zipnumcluster_cc_pyspark.py \
178+
$S3_LOCAL_TEMP_DIR \
179+
--input_base_url="" \
180+
--output_base_url="$ZIPNUM_CLUSTER_DIR" \
181+
--temporary_output_base_url="$ZIPNUM_TEMP_DIR" \
182+
--partition_boundaries_file="$SPLIT_FILE" \
183+
--num_lines=$ZIPNUM_N_LINES \
184+
--num_output_partitions=$ZIPNUM_N_PARTITIONS \
185+
"$WARC_CDX" ""
186+
187+

0 commit comments

Comments
 (0)