Skip to content

Commit 49daea0

Browse files
Merge pull request #45 from commoncrawl/sparkccfile
Add CCFileProcessorSparkJob to support file-wise processing
2 parents cc70f85 + 5d6c1a5 commit 49daea0

11 files changed

+660
-34
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,5 @@ spark-warehouse/
1616

1717
# get-data.sh puts data into
1818
crawl-data/
19-
input/
19+
input/
20+
/.pytest_cache/

README.md

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,18 @@ This project provides examples how to process the Common Crawl dataset with [Apa
66

77
+ [count HTML tags](./html_tag_count.py) in Common Crawl's raw response data (WARC files)
88

9-
+ [count web server names](./server_count.py) in Common Crawl's metadata (WAT files or WARC files)
9+
+ [count web server names](./server_count.py) in Common Crawl's metadata (HTTP headers in WAT or WARC files)
1010

1111
+ list host names and corresponding [IP addresses](./server_ip_address.py) (WAT files or WARC files)
1212

1313
+ [word count](./word_count.py) (term and document frequency) in Common Crawl's extracted text (WET files)
1414

15+
+ [md5sum](./md5sum.py) Run an external command (`md5sum`) on a list of files from a manifest – WARC, WET, WAT, or any other type of file.
16+
1517
+ [extract links](./wat_extract_links.py) from WAT files and [construct the (host-level) web graph](./hostlinks_to_graph.py) – for further details about the web graphs see the project [cc-webgraph](https://github.com/commoncrawl/cc-webgraph)
1618

19+
+ [WET extractor](./wet_extractor.py), using FastWARC and Resiliparse. See also [Using FastWARC](#using-fastwarc-to-read-warc-files).
20+
1721
+ work with the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see also [cc-index-table](https://github.com/commoncrawl/cc-index-table) and the notes about [querying the columnar index](#querying-the-columnar-index)):
1822

1923
- run a SQL query and [export the result as a table](./cc_index_export.py)
@@ -65,7 +69,7 @@ This will install v3.5.7 of [the PySpark python package](https://spark.apache.or
6569

6670
Install Spark if (see the [Spark documentation](https://spark.apache.org/docs/latest/) for guidance). Then, ensure that `spark-submit` and `pyspark` are on your `$PATH`, or prepend `$SPARK_HOME/bin` when running eg `$SPARK_HOME/bin/spark-submit`.
6771

68-
> Note: The PySpark package is required if you want to run the tests in `test/`.
72+
> Note: The PySpark package and "py4j" are required if you want to run the tests in `test/`. The packages are also included in Spark installations at `$SPARK_HOME/python` resp. `$SPARK_HOME/python/lib/py4j-*-src.zip`.
6973
7074
## Compatibility and Requirements
7175

@@ -155,7 +159,10 @@ As the Common Crawl dataset lives in the Amazon Public Datasets program, you can
155159
156160
3. don't forget to deploy all dependencies in the cluster, see [advanced dependency management](https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management)
157161
158-
4. also the the file `sparkcc.py` needs to be deployed or added as argument `--py-files sparkcc.py` to `spark-submit`. Note: some of the examples require further Python files as dependencies.
162+
4. also the the file `sparkcc.py` needs to be deployed or added as argument `--py-files sparkcc.py` to `spark-submit`. Note: several of the examples require further Python files as dependencies.
163+
164+
165+
The script [run_ccpyspark_job_hadoop.sh](./run_ccpyspark_job_hadoop.sh) shows an example how to run a Spark job on a Hadoop cluster (Spark on YARN). Please, do not forget to adapt this script to your needs.
159166
160167
161168
### Command-line options
@@ -206,7 +213,7 @@ Querying the columnar index using cc-pyspark requires authenticated S3 access. T
206213
207214
#### Installation of S3 Support Libraries
208215
209-
While WARC/WAT/WET files are read using boto3, accessing the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see option `--query` of CCIndexSparkJob) is done directly by the SparkSQL engine and requires that S3 support libraries are available. These libs are usually provided when the Spark job is run on a Hadoop cluster running on AWS (eg. EMR). However, they may not be provided for any Spark distribution and are usually absent when running Spark locally (not in a Hadoop cluster). In these situations, the easiest way is to add the libs as required packages by adding `--packages org.apache.hadoop:hadoop-aws:3.2.1` to the arguments of `spark-submit`. This will make [Spark manage the dependencies](https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management) - the hadoop-aws package and transitive dependencies are downloaded as Maven dependencies. Note that the required version of hadoop-aws package depends on the Hadoop version bundled with your Spark installation, e.g., Spark 3.2.1 bundled with Hadoop 3.2 ([spark-3.2.1-bin-hadoop3.2.tgz](https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz)).
216+
While WARC/WAT/WET files are read using boto3, accessing the [columnar URL index](https://commoncrawl.org/2018/03/index-to-warc-files-and-urls-in-columnar-format/) (see option `--query` of CCIndexSparkJob) is done directly by the SparkSQL engine and requires that S3 support libraries are available. These libs are usually provided when the Spark job is run on a Hadoop cluster running on AWS (eg. EMR). However, they may not be provided for any Spark distribution and are usually absent when running Spark locally (not in a Hadoop cluster). In these situations, the easiest way is to add the libs as required packages by adding `--packages org.apache.hadoop:hadoop-aws:3.3.4` to the arguments of `spark-submit`. This will make [Spark manage the dependencies](https://spark.apache.org/docs/latest/submitting-applications.html#advanced-dependency-management) - the hadoop-aws package and transitive dependencies are downloaded as Maven dependencies. Note that the required version of hadoop-aws package depends on the Hadoop version bundled with your Spark installation, e.g., Spark 3.5.6 bundled with Hadoop 3.3.4 ([spark-3.5.6-bin-hadoop3.tgz](https://archive.apache.org/dist/spark/spark-3.5.6/spark-3.5.6-bin-hadoop3.tgz)). Please check your Spark package and the underlying Hadoop installation for the correct version.
210217
211218
Please also note that:
212219
- the schema of the URL referencing the columnar index depends on the actual S3 file system implementation: it's `s3://` on EMR but `s3a://` when using [s3a](https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/index.html#Introducing_the_Hadoop_S3A_client.).
@@ -217,7 +224,8 @@ Please also note that:
217224
Below an example call to count words in 10 WARC records host under the `.is` top-level domain using the `--packages` option:
218225
```
219226
spark-submit \
220-
--packages org.apache.hadoop:hadoop-aws:3.3.2 \
227+
--packages org.apache.hadoop:hadoop-aws:3.3.4 \
228+
--conf spark.sql.parquet.mergeSchema=true \
221229
./cc_index_word_count.py \
222230
--input_base_url s3://commoncrawl/ \
223231
--query "SELECT url, warc_filename, warc_record_offset, warc_record_length, content_charset FROM ccindex WHERE crawl = 'CC-MAIN-2020-24' AND subset = 'warc' AND url_host_tld = 'is' LIMIT 10" \
@@ -241,7 +249,7 @@ Alternatively, it's possible configure the table schema explicitly:
241249
- and use it by adding the command-line argument `--table_schema cc-index-schema-flat.json`.
242250
243251
244-
### Using FastWARC to parse WARC files
252+
### Using FastWARC to read WARC files
245253
246254
> [FastWARC](https://resiliparse.chatnoir.eu/en/latest/man/fastwarc.html) is a high-performance WARC parsing library for Python written in C++/Cython. The API is inspired in large parts by WARCIO, but does not aim at being a drop-in replacement.
247255
@@ -255,6 +263,20 @@ Some differences between the warcio and FastWARC APIs are hidden from the user i
255263
256264
However, it's recommended that you carefully verify that your custom job implementation works in combination with FastWARC. There are subtle differences between the warcio and FastWARC APIs, including the underlying classes (WARC/HTTP headers and stream implementations). In addition, FastWARC does not support for legacy ARC files and does not automatically decode HTTP content and transfer encodings (see [Resiliparse HTTP Tools](https://resiliparse.chatnoir.eu/en/latest/man/parse/http.html#read-chunked-http-payloads)). While content and transfer encodings are already decoded in Common Crawl WARC files, this may not be the case for WARC files from other sources. See also [WARC 1.1 specification, http/https response records](https://iipc.github.io/warc-specifications/specifications/warc-format/warc-1.1/#http-and-https-schemes).
257265
266+
FastWARC allows to filter unwanted WARC record types at parse time, e.g., skip request records immediately not even passing them forward to the caller. To get the maximum performance from FastWARC, it's recommended to utilize the filters by setting the static class variable `fastwarc_record_filter`.
267+
268+
The following examples are ported to use FastWARC:
269+
+ [count HTML tags](./html_tag_count_fastwarc.py)
270+
+ [count web server names](./server_count_fastwarc.py)
271+
+ list host names and corresponding [IP addresses](./server_ip_address_fastwarc.py)
272+
+ [word count](./word_count_fastwarc.py)
273+
274+
In addition, the following tools are implemented using FastWARC:
275+
+ [extract host-level links](./hostlinks_extract_fastwarc.py)
276+
+ [WET extractor](./wet_extractor.py)
277+
278+
Please refer to the above [description of examples](#common-crawl-pyspark-examples) for additional details.
279+
258280
259281
## Running the Tests
260282

html_tag_count_fastwarc.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from fastwarc.warc import WarcRecordType
2+
3+
from sparkcc_fastwarc import CCFastWarcSparkJob
4+
from html_tag_count import TagCountJob
5+
6+
7+
class TagCountFastWarcJob(TagCountJob, CCFastWarcSparkJob):
8+
""" Count HTML tag names in Common Crawl WARC files
9+
using FastWARC to read WARC files"""
10+
11+
name = "TagCount"
12+
13+
fastwarc_record_filter = WarcRecordType.response
14+
15+
16+
if __name__ == '__main__':
17+
job = TagCountFastWarcJob()
18+
job.run()

md5sum.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import subprocess
2+
3+
from sparkcc import CCFileProcessorSparkJob
4+
from pyspark.sql.types import StructType, StructField, StringType
5+
6+
7+
class MD5Sum(CCFileProcessorSparkJob):
8+
"""MD5 sum of each file, calling the command-line utility 'md5sum'"""
9+
10+
name = "MD5Sum"
11+
12+
output_schema = StructType([
13+
StructField("uri", StringType(), True),
14+
StructField("md5", StringType(), True),
15+
])
16+
17+
def process_file(self, uri, tempfd):
18+
proc = subprocess.run(['md5sum', tempfd.name], capture_output=True, check=True, encoding='utf8')
19+
digest = proc.stdout.rstrip().split()[0]
20+
yield uri, digest
21+
22+
if __name__ == '__main__':
23+
job = MD5Sum()
24+
job.run()

run_ccpyspark_job_hadoop.sh

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
#!/bin/sh
2+
3+
# example shell script to run a cc-pyspark job on a Hadoop cluster (Spark on YARN)
4+
5+
SCRIPT="$1"
6+
WAREHOUSE="$2"
7+
8+
if [ -z "$SCRIPT" ] || [ -z "$WAREHOUSE" ]; then
9+
echo "Usage: $0 <script> <warehouse> <args>..."
10+
echo " Run a cc-pyspark job in Spark/Hadoop cluster"
11+
echo
12+
echo "Arguments:"
13+
echo " <script> cc-pyspark job implementation"
14+
echo " <warehouse> Spark SQL warehouse directory"
15+
echo " <args>... remaining args are passed to the job"
16+
echo
17+
echo "Example:"
18+
echo " $0 server_count.py hdfs:///user/max/counts \\"
19+
echo " wat_sample.paths servers"
20+
echo
21+
echo "Note: don't forget to adapt the number of executors,"
22+
echo " input/output partitions, the memory requirements"
23+
echo " and other parameters at your need!"
24+
echo " Some params can be set per environment variable."
25+
exit 1
26+
fi
27+
28+
# strip SCRIPT and WAREHOUSE from argument list
29+
shift 2
30+
31+
SPARK_ON_YARN="--master yarn"
32+
SPARK_HADOOP_OPTS=""
33+
SPARK_EXTRA_OPTS=""
34+
35+
# defines SPARK_HOME, SPARK_HADOOP_OPTS and HADOOP_CONF_DIR
36+
. $HOME/workspace/spark/spark_env.sh
37+
38+
NUM_EXECUTORS=${NUM_EXECUTORS:-1}
39+
EXECUTOR_MEM=${EXECUTOR_MEM:-4g}
40+
EXECUTOR_CORES=${EXECUTOR_CORES:-2}
41+
42+
# access data via S3
43+
INPUT_BASE_URL="s3://commoncrawl/"
44+
45+
# temporary directory
46+
# - must exist on task/compute nodes for buffering data
47+
# - should provide several GBs of free space to hold temporarily
48+
# the downloaded data (WARC, WAT, WET files)
49+
TMPDIR=/data/0/tmp
50+
51+
export PYSPARK_PYTHON="python" # or "python3"
52+
53+
# Python dependencies (for simplicity, include all Python files: cc-pyspark/*.py)
54+
PYFILES=$(ls sparkcc.py sparkcc_fastwarc.py *.py | sort -u | tr '\n' ',')
55+
56+
57+
58+
set -xe
59+
60+
$SPARK_HOME/bin/spark-submit \
61+
$SPARK_ON_YARN \
62+
$SPARK_HADOOP_OPTS \
63+
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
64+
--conf spark.task.maxFailures=5 \
65+
--conf spark.executor.memory=$EXECUTOR_MEM \
66+
--conf spark.driver.memory=3g \
67+
--conf spark.core.connection.ack.wait.timeout=600s \
68+
--conf spark.network.timeout=300s \
69+
--conf spark.shuffle.io.maxRetries=50 \
70+
--conf spark.shuffle.io.retryWait=600s \
71+
--conf spark.locality.wait=1s \
72+
--conf spark.io.compression.codec=zstd \
73+
--conf spark.checkpoint.compress=true \
74+
--conf spark.executorEnv.LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native \
75+
--num-executors $NUM_EXECUTORS \
76+
--executor-cores $EXECUTOR_CORES \
77+
--executor-memory $EXECUTOR_MEM \
78+
--conf spark.sql.warehouse.dir=$WAREHOUSE \
79+
--conf spark.sql.parquet.outputTimestampType=TIMESTAMP_MILLIS \
80+
--py-files $PYFILES \
81+
$SCRIPT \
82+
--input_base_url $INPUT_BASE_URL \
83+
--local_temp_dir $TMPDIR \
84+
"$@"

server_count_fastwarc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
class ServerCountFastWarcJob(ServerCountJob, CCFastWarcSparkJob):
88
""" Count server names sent in HTTP response header
99
(WARC and WAT is allowed as input) using FastWARC
10-
to parse WARC files"""
10+
to read WARC files"""
1111

1212
name = "CountServers"
1313

server_ip_address_fastwarc.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from fastwarc.warc import WarcRecordType
2+
3+
from sparkcc_fastwarc import CCFastWarcSparkJob
4+
from server_ip_address import ServerIPAddressJob
5+
6+
7+
class ServerIPAddressFastWarcJob(ServerIPAddressJob, CCFastWarcSparkJob):
8+
""" Collect server IP addresses from WARC response records
9+
(WARC and WAT is allowed as input) using FastWARC
10+
to parse WARC files"""
11+
12+
name = "ServerIPAddresses"
13+
14+
# process only WARC request or metadata (including WAT) records
15+
# Note: restrict the filter accordingly, depending on whether
16+
# WARC or WAT files are used
17+
fastwarc_record_filter = WarcRecordType.request | WarcRecordType.metadata
18+
19+
20+
if __name__ == "__main__":
21+
job = ServerIPAddressFastWarcJob()
22+
job.run()

0 commit comments

Comments
 (0)