|
| 1 | +#!/bin/sh |
| 2 | + |
| 3 | +# example shell script to run a cc-pyspark job on a Hadoop cluster (Spark on YARN) |
| 4 | + |
| 5 | +SCRIPT="$1" |
| 6 | +WAREHOUSE="$2" |
| 7 | + |
| 8 | +if [ -z "$SCRIPT" ] || [ -z "$WAREHOUSE" ]; then |
| 9 | + echo "Usage: $0 <script> <warehouse> <args>..." |
| 10 | + echo " Run a cc-pyspark job in Spark/Hadoop cluster" |
| 11 | + echo |
| 12 | + echo "Arguments:" |
| 13 | + echo " <script> cc-pyspark job implementation" |
| 14 | + echo " <warehouse> Spark SQL warehouse directory" |
| 15 | + echo " <args>... remaining args are passed to the job" |
| 16 | + echo |
| 17 | + echo "Example:" |
| 18 | + echo " $0 server_count.py hdfs:///user/max/counts \\" |
| 19 | + echo " wat_sample.paths servers" |
| 20 | + echo |
| 21 | + echo "Note: don't forget to adapt the number of executors," |
| 22 | + echo " input/output partitions, the memory requirements" |
| 23 | + echo " and other parameters at your need!" |
| 24 | + echo " Some params can be set per environment variable." |
| 25 | + exit 1 |
| 26 | +fi |
| 27 | + |
| 28 | +# strip SCRIPT and WAREHOUSE from argument list |
| 29 | +shift 2 |
| 30 | + |
| 31 | +SPARK_ON_YARN="--master yarn" |
| 32 | +SPARK_HADOOP_OPTS="" |
| 33 | +SPARK_EXTRA_OPTS="" |
| 34 | + |
| 35 | +# defines SPARK_HOME, SPARK_HADOOP_OPTS and HADOOP_CONF_DIR |
| 36 | +. $HOME/workspace/spark/spark_env.sh |
| 37 | + |
| 38 | +NUM_EXECUTORS=${NUM_EXECUTORS:-1} |
| 39 | +EXECUTOR_MEM=${EXECUTOR_MEM:-4g} |
| 40 | +EXECUTOR_CORES=${EXECUTOR_CORES:-2} |
| 41 | + |
| 42 | +# access data via S3 |
| 43 | +INPUT_BASE_URL="s3://commoncrawl/" |
| 44 | + |
| 45 | +# temporary directory |
| 46 | +# - must exist on task/compute nodes for buffering data |
| 47 | +# - should provide several GBs of free space to hold temporarily |
| 48 | +# the downloaded data (WARC, WAT, WET files) |
| 49 | +TMPDIR=/data/0/tmp |
| 50 | + |
| 51 | +export PYSPARK_PYTHON="python" # or "python3" |
| 52 | + |
| 53 | +# Python dependencies (for simplicity, include all Python files: cc-pyspark/*.py) |
| 54 | +PYFILES=$(ls sparkcc.py sparkcc_fastwarc.py *.py | sort -u | tr '\n' ',') |
| 55 | + |
| 56 | + |
| 57 | + |
| 58 | +set -xe |
| 59 | + |
| 60 | +$SPARK_HOME/bin/spark-submit \ |
| 61 | + $SPARK_ON_YARN \ |
| 62 | + $SPARK_HADOOP_OPTS \ |
| 63 | + --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ |
| 64 | + --conf spark.task.maxFailures=5 \ |
| 65 | + --conf spark.executor.memory=$EXECUTOR_MEM \ |
| 66 | + --conf spark.driver.memory=3g \ |
| 67 | + --conf spark.core.connection.ack.wait.timeout=600s \ |
| 68 | + --conf spark.network.timeout=300s \ |
| 69 | + --conf spark.shuffle.io.maxRetries=50 \ |
| 70 | + --conf spark.shuffle.io.retryWait=600s \ |
| 71 | + --conf spark.locality.wait=1s \ |
| 72 | + --conf spark.io.compression.codec=zstd \ |
| 73 | + --conf spark.checkpoint.compress=true \ |
| 74 | + --conf spark.executorEnv.LD_LIBRARY_PATH=/usr/lib/hadoop/lib/native \ |
| 75 | + --num-executors $NUM_EXECUTORS \ |
| 76 | + --executor-cores $EXECUTOR_CORES \ |
| 77 | + --executor-memory $EXECUTOR_MEM \ |
| 78 | + --conf spark.sql.warehouse.dir=$WAREHOUSE \ |
| 79 | + --conf spark.sql.parquet.outputTimestampType=TIMESTAMP_MILLIS \ |
| 80 | + --py-files $PYFILES \ |
| 81 | + $SCRIPT \ |
| 82 | + --input_base_url $INPUT_BASE_URL \ |
| 83 | + --local_temp_dir $TMPDIR \ |
| 84 | + "$@" |
0 commit comments