Skip to content

Commit 5f7a3b1

Browse files
committed
merge
2 parents bd3a2ae + c396062 commit 5f7a3b1

File tree

1 file changed

+37
-10
lines changed

1 file changed

+37
-10
lines changed

ec2/spark_ec2.py

+37-10
Original file line numberDiff line numberDiff line change
@@ -613,12 +613,38 @@ def get_existing_cluster(conn, opts, cluster_name, die_on_error=True):
613613
sys.exit(1)
614614

615615

616+
def get_connection_address(instance):
617+
"""
618+
Return some address that can be used to connect to instance.
619+
It is common that VPC instance does not have either public DNS or public IP or private DNS.
620+
Private DNS name might be problematic to use if you're VPN does not resolve private DNS.
621+
But private IP is always there.
622+
623+
TODO: maybe add static cache of instance-id to resolved address
624+
625+
:type instance: :class:`ec2.instance.Instance`
626+
:return: string
627+
"""
628+
import socket
629+
addresses = [instance.public_dns_name, instance.ip_address, instance.private_dns_name]
630+
for address in addresses:
631+
if address:
632+
try:
633+
# try to resolve address
634+
socket.gethostbyaddr(address)
635+
return address
636+
except (socket.gaierror, socket.herror):
637+
pass
638+
639+
return instance.private_ip_address
640+
641+
616642
# Deploy configuration files and run setup scripts on a newly launched
617643
# or started EC2 cluster.
618644

619645

620646
def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
621-
master = master_nodes[0].public_dns_name
647+
master = get_connection_address(master_nodes[0])
622648
if deploy_ssh_key:
623649
print "Generating cluster's SSH key on master..."
624650
key_setup = """
@@ -630,8 +656,9 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):
630656
dot_ssh_tar = ssh_read(master, opts, ['tar', 'c', '.ssh'])
631657
print "Transferring cluster's SSH key to slaves..."
632658
for slave in slave_nodes:
633-
print slave.public_dns_name
634-
ssh_write(slave.public_dns_name, opts, ['tar', 'x'], dot_ssh_tar)
659+
slave_address = get_connection_address(slave)
660+
print slave_address
661+
ssh_write(slave_address, opts, ['tar', 'x'], dot_ssh_tar)
635662

636663
modules = ['spark', 'ephemeral-hdfs', 'persistent-hdfs',
637664
'mapreduce', 'spark-standalone', 'tachyon']
@@ -709,7 +736,7 @@ def is_cluster_ssh_available(cluster_instances, opts, print_ssh_output=False):
709736
Check if SSH is available on all the instances in a cluster.
710737
"""
711738
for i in cluster_instances:
712-
if not is_ssh_available(host=i.ip_address, opts=opts, print_ssh_output=print_ssh_output):
739+
if not is_ssh_available(host=get_connection_address(i), opts=opts, print_ssh_output=print_ssh_output):
713740
return False
714741
else:
715742
return True
@@ -823,7 +850,7 @@ def get_num_disks(instance_type):
823850
#
824851
# root_dir should be an absolute path to the directory with the files we want to deploy.
825852
def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
826-
active_master = master_nodes[0].public_dns_name
853+
active_master = get_connection_address(master_nodes[0])
827854

828855
num_disks = get_num_disks(opts.instance_type)
829856
hdfs_data_dirs = "/mnt/ephemeral-hdfs/data"
@@ -845,9 +872,9 @@ def deploy_files(conn, root_dir, opts, master_nodes, slave_nodes, modules):
845872
spark_v = "%s|%s" % (opts.spark_git_repo, opts.spark_version)
846873

847874
template_vars = {
848-
"master_list": '\n'.join([i.public_dns_name for i in master_nodes]),
875+
"master_list": '\n'.join([get_connection_address(i) for i in master_nodes]),
849876
"active_master": active_master,
850-
"slave_list": '\n'.join([i.public_dns_name for i in slave_nodes]),
877+
"slave_list": '\n'.join([get_connection_address(i) for i in slave_nodes]),
851878
"cluster_url": cluster_url,
852879
"hdfs_data_dirs": hdfs_data_dirs,
853880
"mapred_local_dirs": mapred_local_dirs,
@@ -1052,7 +1079,7 @@ def real_main():
10521079
(master_nodes, slave_nodes) = get_existing_cluster(
10531080
conn, opts, cluster_name, die_on_error=False)
10541081
for inst in master_nodes + slave_nodes:
1055-
print "> %s" % inst.public_dns_name
1082+
print "> %s" % get_connection_address(inst)
10561083

10571084
msg = "ALL DATA ON ALL NODES WILL BE LOST!!\nDestroy cluster %s (y/N): " % cluster_name
10581085
response = raw_input(msg)
@@ -1114,7 +1141,7 @@ def real_main():
11141141

11151142
elif action == "login":
11161143
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
1117-
master = master_nodes[0].public_dns_name
1144+
master = get_connection_address(master_nodes[0])
11181145
print "Logging into master " + master + "..."
11191146
proxy_opt = []
11201147
if opts.proxy_port is not None:
@@ -1138,7 +1165,7 @@ def real_main():
11381165

11391166
elif action == "get-master":
11401167
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
1141-
print master_nodes[0].public_dns_name
1168+
print get_connection_address(master_nodes[0])
11421169

11431170
elif action == "stop":
11441171
response = raw_input(

0 commit comments

Comments
 (0)