Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 43 additions & 38 deletions pydoop/hadoop_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,51 +80,56 @@ def reset(self):

# note that this can be None even after trying detection
def hadoop_home(self):
if not self.__hadoop_home:
hh = os.getenv("HADOOP_HOME", os.getenv("HADOOP_PREFIX"))
if not hh:
exe = subprocess.check_output(
"command -v hadoop", shell=True, universal_newlines=True
).strip()
candidate, child = os.path.split(os.path.dirname(exe))
if child == "bin" and os.path.isdir(candidate):
hh = os.environ["HADOOP_HOME"] = candidate
self.__hadoop_home = hh
return self.__hadoop_home
#if not self.__hadoop_home:
# hh = os.getenv("HADOOP_HOME", os.getenv("HADOOP_PREFIX"))
# if not hh:
# exe = subprocess.check_output(
# "command -v hadoop", shell=True, universal_newlines=True
# ).strip()
# candidate, child = os.path.split(os.path.dirname(exe))
# if child == "bin" and os.path.isdir(candidate):
# hh = os.environ["HADOOP_HOME"] = candidate
# self.__hadoop_home = hh
#return self.__hadoop_home
return "/tmp"

def hadoop_conf(self):
if not self.__hadoop_conf:
error = "Hadoop config not found, try setting HADOOP_CONF_DIR"
try:
self.__hadoop_conf = os.environ["HADOOP_CONF_DIR"]
except KeyError:
hh = self.hadoop_home()
if not hh:
raise RuntimeError(error)
candidate = os.path.join(hh, 'etc', 'hadoop')
if not os.path.isdir(candidate):
raise RuntimeError(error)
self.__hadoop_conf = os.environ["HADOOP_CONF_DIR"] = candidate
return self.__hadoop_conf
#if not self.__hadoop_conf:
# error = "Hadoop config not found, try setting HADOOP_CONF_DIR"
# try:
# self.__hadoop_conf = os.environ["HADOOP_CONF_DIR"]
# except KeyError:
# hh = self.hadoop_home()
# if not hh:
# raise RuntimeError(error)
# candidate = os.path.join(hh, 'etc', 'hadoop')
# if not os.path.isdir(candidate):
# raise RuntimeError(error)
# self.__hadoop_conf = os.environ["HADOOP_CONF_DIR"] = candidate
#return self.__hadoop_conf
return "/tmp"

def hadoop_params(self):
if not self.__hadoop_params:
params = {}
hadoop_conf = self.hadoop_conf()
for n in "hadoop", "core", "hdfs", "mapred":
fn = os.path.join(hadoop_conf, "%s-site.xml" % n)
try:
params.update(parse_hadoop_conf_file(fn))
except (IOError, HadoopXMLError):
pass # silently ignore, as in Hadoop
self.__hadoop_params = params
return self.__hadoop_params
#if not self.__hadoop_params:
# params = {}
# hadoop_conf = self.hadoop_conf()
# for n in "hadoop", "core", "hdfs", "mapred":
# fn = os.path.join(hadoop_conf, "%s-site.xml" % n)
# try:
# params.update(parse_hadoop_conf_file(fn))
# except (IOError, HadoopXMLError):
# pass # silently ignore, as in Hadoop
# self.__hadoop_params = params
#return self.__hadoop_params
return {}

def hadoop_classpath(self):
if not self.__hadoop_classpath:
cp = subprocess.check_output(
"hadoop classpath --glob", shell=True, universal_newlines=True
).strip()
#cp = subprocess.check_output(
# "hadoop classpath --glob", shell=True, universal_newlines=True
#).strip()
# libhdfs.go does not need these jar files
cp = ""
# older hadoop versions ignore --glob
if 'hadoop-common' not in cp:
cp = ':'.join(':'.join(glob.iglob(_)) for _ in cp.split(':'))
Expand Down
4 changes: 2 additions & 2 deletions pydoop/hdfs/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@


def init():
import pydoop.utils.jvm as jvm
jvm.load_jvm_lib()
#import pydoop.utils.jvm as jvm
#jvm.load_jvm_lib()
try:
# NOTE: JVM must be already instantiated
import pydoop.native_core_hdfs
Expand Down
50 changes: 34 additions & 16 deletions pydoop/hdfs/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,26 +67,44 @@ def _get_ip(host, default=None):
ip = "0.0.0.0" # same as socket.gethostbyname("")
return ip if ip != "0.0.0.0" else default

def _get_hopsfs_connection_info():
# Read LIBHDFS_DEFAULT_FS and LIBHDFS_DEFAULT_USER environment variables
default_fs = os.getenv("LIBHDFS_DEFAULT_FS")
default_user = os.getenv("LIBHDFS_DEFAULT_USER")

# Split host and port from LIBHDFS_DEFAULT_FS
if default_fs:
host, port = default_fs.split(":")
else:
host, port = None, None

return host, port, default_user

def _get_connection_info(host, port, user):
fs = core_hdfs_fs(host, port, user)
res = urlparse(fs.get_working_directory())
if not res.scheme or res.scheme == "file":
h, p, u = "", 0, getpass.getuser()
fs.set_working_directory(os.getcwd()) # libhdfs "remembers" old cwd

# get hopsfs connection info from env variables
h, p, u = _get_hopsfs_connection_info()
if host is not None and port is not None and user is not None:
return h, int(p), u, fs
else:
try:
h, p = res.netloc.split(":")
except ValueError:
h, p = res.netloc, common.DEFAULT_PORT

# try to find an IP address if we can't extract it from res.netloc
if not res.netloc:
hosts = fs.get_hosts(str(res.path), 0, 0)
if hosts and hosts[0] and hosts[0][0]:
h, p = hosts[0][0], common.DEFAULT_PORT
u = res.path.split("/", 2)[2]
return h, int(p), u, fs
res = urlparse(fs.get_working_directory())
if not res.scheme or res.scheme == "file":
h, p, u = "", 0, getpass.getuser()
fs.set_working_directory(os.getcwd()) # libhdfs "remembers" old cwd
else:
try:
h, p = res.netloc.split(":")
except ValueError:
h, p = res.netloc, common.DEFAULT_PORT

# try to find an IP address if we can't extract it from res.netloc
if not res.netloc:
hosts = fs.get_hosts(str(res.path), 0, 0)
if hosts and hosts[0] and hosts[0][0]:
h, p = hosts[0][0], common.DEFAULT_PORT
u = res.path.split("/", 2)[2]
return h, int(p), u, fs


def _default_fs():
Expand Down
8 changes: 8 additions & 0 deletions scripts/make_distro
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#!/bin/bash

# install
# activate env
#CFLAGS=-I/usr/include/tirpc pip install .

CFLAGS=-I/usr/include/tirpc python setup.py sdist

16 changes: 8 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ def write_version(filename="pydoop/version.py"):
Extension(
'pydoop.native_core_hdfs',
include_dirs=[
'src/libhdfs',
#'src/libhdfs',
'src/libhdfs/include',
'src/libhdfs/os/posix',
#'src/libhdfs/os/posix',
],
sources=list(itertools.chain(
glob.iglob('src/libhdfs/*.c'),
glob.iglob('src/libhdfs/common/*.c'),
glob.iglob('src/libhdfs/os/posix/*.c'),
#glob.iglob('src/libhdfs/*.c'),
#glob.iglob('src/libhdfs/common/*.c'),
#glob.iglob('src/libhdfs/os/posix/*.c'),
glob.iglob('src/native_core_hdfs/*.cc')
)),
extra_compile_args=EXTRA_COMPILE_ARGS,
Expand Down Expand Up @@ -290,8 +290,8 @@ def __finalize_hdfs(self, ext):

# called for each extension, after compiler has been set up
def build_extension(self, ext):
if ext.name == "pydoop.native_core_hdfs":
self.__finalize_hdfs(ext)
#if ext.name == "pydoop.native_core_hdfs":
# self.__finalize_hdfs(ext)
build_ext.build_extension(self, ext)


Expand All @@ -317,7 +317,7 @@ def run(self):
build.run(self)
try:
self.create_tmp()
self.build_java()
#self.build_java()
finally:
# On NFS, if we clean up right away we have issues with
# NFS handles being still in the directory trees to be
Expand Down
Loading