-
Notifications
You must be signed in to change notification settings - Fork 26
/
Copy pathRun_Spark_on_Google_Colab.py
395 lines (283 loc) · 882 KB
/
Run_Spark_on_Google_Colab.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
# This file was generated from Run_Spark_on_Google_Colab.ipynb with nbconvert
# Source: https://github.com/groda/big_data
#!/usr/bin/env python
# coding: utf-8
# <a href="https://colab.research.google.com/github/groda/big_data/blob/master/Run_Spark_on_Google_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>
# <a href="https://github.com/groda/big_data"><div><img src="https://github.com/groda/big_data/blob/master/logo_bdb.png?raw=true" align=right width="90" alt="Logo Big Data for Beginners"></div></a>
# # Run Apache Spark on Google Colab
#
# This is a compact guide on how to set up Apache Spark on Google Colab.
#
# A more detailed walkthrough of how to setup Spark on a single machine in *standalone mode* is presented in in [Hadoop: Setting up Spark Standalone on Google Colab](https://github.com/groda/big_data/blob/master/Hadoop_Setting_up_Spark_Standalone_on_Google_Colab.ipynb).
# ## Setup Spark
#
# Setup Apache Spark in 1️⃣ 2️⃣ 3️⃣ 4️⃣ steps (step 0️⃣ is the Java installation, which is skipped because Java is available in Google Colab).
#
# The following code should also run on any Ubuntu machine or Docker container except for the Web servers links.
# In[1]:
import requests
import subprocess
import os
import re
import socket
import shutil
import time
import sys
def run(cmd):
# run a shell command
try:
# Run the command and capture stdout and stderr
subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
# Access stdout (stderr redirected to stdout)
stdout_result = subprocess_output.stdout.strip().splitlines()[-1]
# Process the results as needed
print(f'✅ {stdout_result}')
return stdout_result
except subprocess.CalledProcessError as e:
# Handle the error if the command returns a non-zero exit code
print(f"Command failed with return code {e.returncode}")
print("stdout:", e.stdout)
def is_java_installed():
return shutil.which("java")
def install_java():
# Uncomment and modify the desired version
# java_version= 'openjdk-11-jre-headless'
# java_version= 'default-jre'
# java_version= 'openjdk-17-jre-headless'
# java_version= 'openjdk-18-jre-headless'
java_version= 'openjdk-19-jre-headless'
os.environ['JAVA_HOME'] = ' /usr/lib/jvm/java-19-openjdk-amd64'
print(f"Java not found. Installing {java_version} ... (this might take a while)")
try:
cmd = f"apt install -y {java_version}"
subprocess_output = subprocess.run(cmd, shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
stdout_result = subprocess_output.stdout
# Process the results as needed
print(f'✅ Done installing Java {java_version}')
except subprocess.CalledProcessError as e:
# Handle the error if the command returns a non-zero exit code
print(f"Command failed with return code {e.returncode}")
print("stdout:", e.stdout)
print("
0️⃣ Install Java if not available")
if is_java_installed():
print("✅ Java is already installed.")
else:
install_java()
print("
1️⃣ Download and install Hadoop and Spark")
# URL for downloading Hadoop and Spark
SPARK_VERSION = "3.5.3"
HADOOP_SPARK_URL = "https://dlcdn.apache.org/spark/spark-" + SPARK_VERSION + \
"/spark-" + SPARK_VERSION + "-bin-hadoop3.tgz"
r = requests.head(HADOOP_SPARK_URL)
if r.status_code >= 200 and r.status_code < 400:
print(f'✅ {HADOOP_SPARK_URL} was found')
else:
SPARK_CDN = "https://dlcdn.apache.org/spark/"
print(f'⚠️ {HADOOP_SPARK_URL} was NOT found.
Check for available Spark versions in {SPARK_CDN}')
# set some environment variables
os.environ['SPARK_HOME'] = os.path.join(os.getcwd(), os.path.splitext(os.path.basename(HADOOP_SPARK_URL))[0])
os.environ['PATH'] = ':'.join([os.path.join(os.environ['SPARK_HOME'], 'bin'), os.environ['PATH']])
os.environ['PATH'] = ':'.join([os.path.join(os.environ['SPARK_HOME'], 'sbin'), os.environ['PATH']])
# download Spark
# using --no-clobber option will prevent wget from downloading file if already present
# shell command: wget --no-clobber $HADOOP_SPARK_URL
cmd = f"wget --no-clobber {HADOOP_SPARK_URL}"
run(cmd)
# uncompress
try:
# Run the command and capture stdout and stderr
cmd = "([ -d $(basename {0}|sed 's/\.[^.]*$//') ] && echo -n 'Folder already exists') || (tar xzf $(basename {0}) && echo 'Uncompressed Spark distribution')"
subprocess_output = subprocess.run(cmd.format(HADOOP_SPARK_URL), shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
# Access stdout (stderr redirected to stdout)
stdout_result = subprocess_output.stdout
# Process the results as needed
print(f'✅ {stdout_result}')
except subprocess.CalledProcessError as e:
# Handle the error if the command returns a non-zero exit code
print(f"Command failed with return code {e.returncode}")
print("stdout:", e.stdout)
print("
2️⃣ Start Spark engine")
# start master
# shell command: $SPARK_HOME/sbin/start-master.sh
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-master.sh')
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-master.sh')
out = run(cmd)
# start one worker (first stop it in case it's already running)
# shell command: $SPARK_HOME/sbin/start-worker.sh spark://${HOSTNAME}:7077
cmd = [os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-worker.sh')]
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-worker.sh') + ' ' + 'spark://'+socket.gethostname()+':7077'
run(cmd)
print("
3️⃣ Start Master Web UI")
# get master UI's port number
# the subprocess that's starting the master with start-master.sh
# might still not be ready with assigning the port number at this point
# therefore we check the logfile a few times (attempts=5) to see if the port
# has been assigned. This might take 1-2 seconds.
master_log = out.partition("logging to")[2].strip()
print("Search for port number in log file {}".format(master_log))
attempts = 10
search_pattern = "Successfully started service 'MasterUI' on port (\d+)"
found = False
for i in range(attempts):
if not found:
with open(master_log) as log:
found = re.search(search_pattern, log.read())
if found:
webUIport = found.group(1)
print(f"✅ Master UI is available at localhost:{webUIport} (attempt nr. {i})")
break
else:
time.sleep(2) # need to try until port information is found in the logfile
i+=1
if not found:
print("Could not find port for Master Web UI
")
IN_COLAB = 'google.colab' in sys.modules
if IN_COLAB:
# serve the Web UI on Colab
print("Click on the link below to open the Spark Web UI 🚀")
from google.colab import output
output.serve_kernel_port_as_window(webUIport)
print("
4️⃣ Start history server")
# start history server
# shell command: mkdir -p /tmp/spark-events
# shell command: $SPARK_HOME/sbin/start-history-server.sh
spark_events_dir = os.path.join('/tmp', 'spark-events')
if not os.path.exists(spark_events_dir):
os.mkdir(spark_events_dir)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'stop-history-server.sh')
run(cmd)
cmd = os.path.join(os.environ['SPARK_HOME'], 'sbin', 'start-history-server.sh')
run(cmd)
if IN_COLAB:
# serve the History Server
print("Click on the link below to open the Spark History Server Web UI 🚀")
output.serve_kernel_port_as_window(18080)
# ## Run a couple of examples
#
# We are going to run pre-built Java examples that come with the Spark distribution.
# ### Run the Java application `SparkPi` to estimate $\pi$
#
# Run the pre-built example `org.apache.spark.examples.SparkPi` that comes with the Spark distribution using `bash` to submit the job.
#
# $100$ is the number of iterations.
# In[2]:
get_ipython().run_cell_magic('bash', '', '
EXAMPLES_JAR=$(find $SPARK_HOME/examples/jars/ -name "spark-examples*")
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://${HOSTNAME}:7077 \
--conf spark.eventLog.enabled=true \
$EXAMPLES_JAR \
100 \
2>/tmp/SparkPi_bash.log
')
# ### Java Word Count
#
# The source code for this example is available on GitHub at: https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
# In[3]:
get_ipython().run_cell_magic('bash', '', '
echo "Downloading file ..."
URL="https://www.gutenberg.org/cache/epub/71036/pg71036.txt"
([ -f datafile.txt ] && echo "File already exists") || curl -o datafile.txt $URL
')
# In[4]:
get_ipython().run_cell_magic('bash', '', '
EXAMPLES_JAR=$(find $SPARK_HOME/examples/jars/ -name "spark-examples*")
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.JavaWordCount \
--master spark://${HOSTNAME}:7077 \
--conf spark.eventLog.enabled=true \
$EXAMPLES_JAR \
datafile.txt \
2>/tmp/JavaWordCount.log \
1>/tmp/JavaWordCount.out
head /tmp/JavaWordCount.out
')
# To download a larger amount of data from the Gutenberg collection, see the tutorial [Explore and download books from the Gutenberg Books collection](https://github.com/groda/big_data/blob/master/GutenbergBooks.ipynb) from the <a href="https://github.com/groda/big_data/blob/master/README.md">BDb repository</a>.
# ## Where to find things
# ### Spark Web UI
#
# The Spark Web UI is available at:
# In[5]:
if IN_COLAB:
from google.colab.output import eval_js
print(eval_js( "google.colab.kernel.proxyPort(" + str(webUIport) + ")" ))
# In the free tier of Google Colab this functionality might not be available (see https://research.google.com/colaboratory/faq.html#limitations-and-restrictions). As an alternative, you can use [ngrok](https://ngrok.com/) after signing up for a free account.
# #### Use ngrok to access the Web UI
# Check the NGROK box below if you want to use ngrok (by default this is set to `False`).
# In[6]:
# you should set this to True
NGROK = False #@param {type:"boolean"}
# We are going to use the Python ngrok client `pyngrok` (see the [Colab example](https://pyngrok.readthedocs.io/en/latest/integrations.html#colab-http-example)).
# In[7]:
if NGROK:
get_ipython().system('pip install pyngrok')
from pyngrok import ngrok, conf
import getpass
print("Enter your authtoken, which can be copied from https://dashboard.ngrok.com/get-started/your-authtoken")
authtoken = getpass.getpass()
conf.get_default().auth_token = authtoken
# **Note:** It might be necessary to close other open sessions by stopping ngrok agents in [https://dashboard.ngrok.com/tunnels/agents](https://dashboard.ngrok.com/tunnels/agents) (the ngrok free tier has a limit of 1 simultaneous ngrok agent sessions).
# After entering the ngrok authorization token, you can open a connection.
# In[8]:
if NGROK:
# close all existing connections (https://pyngrok.readthedocs.io/en/latest/#get-active-tunnels)
tunnels = ngrok.get_tunnels()
if tunnels:
map(lambda t: ngrok.disconnect(t.public_url), tunnels)
# Open a ngrok tunnel to the HTTP server
public_url = ngrok.connect(webUIport).public_url
print(f'Click on {public_url} to open the Spark Master Web UI')
# You can safely ignore the warning since we are not disclosing any confidential information and proceed with clicking on the "Visit site" button.
# 
# 
# ### Spark History Server
#
# The Spark History Server is available at:
# In[9]:
if IN_COLAB:
from google.colab.output import eval_js
print(eval_js( "google.colab.kernel.proxyPort(" + str(18080) + ")" ))
# With ngrok:
# In[10]:
if NGROK:
# Open a ngrok tunnel to the HTTP server
public_url = ngrok.connect(18080).public_url
print(f'Click on {public_url} to open the Spark Master Web UI')
# 
# ### Logs for the Spark Master
# In[11]:
get_ipython().system('head -20 $SPARK_HOME/logs/*Master*.out')
# ### Logs for the Spark Worker
# In[12]:
get_ipython().system('head -20 $SPARK_HOME/logs/*Worker*.out')
# ### Spark events (used by History Server)
# In[13]:
get_ipython().system('head -20 /tmp/spark-events/*')
# ### Spark configuration
#
# To customize Spark use the configuration templates in `$SPARK_HOME/conf` (remove the template extension).
#
# In[14]:
get_ipython().system('ls -al $SPARK_HOME/conf')
# ## Shutdown
#
# Stop all services.
# In[15]:
get_ipython().run_cell_magic('bash', '', '$SPARK_HOME/sbin/stop-history-server.sh
$SPARK_HOME/sbin/stop-worker.sh
$SPARK_HOME/sbin/stop-master.sh
')
# Terminate the ngrok processes.
# In[16]:
if NGROK:
ngrok.kill()