Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ jobs:
python -m pip install --upgrade pip flake8 pytest
pip install -r requirements.txt
# Optional Dependency for HDF Checkpointing
pip install tables
python -m pip install tables
# All Possible Optional Dependencies for Excel Checkpointing
# python -m pip install XlsxWriter openpyxl pyxlsb xlrd xlwt
python -m pip install xlrd xlwt
python setup.py install
python setup.py build_ext --inplace
python -m pip list
Expand Down
152 changes: 150 additions & 2 deletions hatchet/graphframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,24 @@ def parallel_apply(filter_function, dataframe, queue):
queue.put(filtered_df)


# TODO Move into global configuration when Connor's PR is merged
_format_extensions = {
".hdf5": "hdf",
".hdf": "hdf",
".h5": "hdf",
".csv": "csv",
".xls": "excel",
".xlsx": "excel",
".xlsm": "excel",
".xlsb": "excel",
".odf": "excel",
".ods": "excel",
".odt": "excel",
".pkl": "pickle",
".pickle": "pickle",
}


class GraphFrame:
"""An input dataset is read into an object of this type, which includes a graph
and a dataframe.
Expand Down Expand Up @@ -299,18 +317,134 @@ def from_lists(*lists):
return gf

@staticmethod
def from_hdf(filename, **kwargs):
def load(filename, fileformat=None, **kwargs):
format_priority = ["hdf", "pickle", "csv", "excel"]
fformat = fileformat
if fformat is None:
# TODO
# for ext in self._format_extensions.keys():
for ext in _format_extensions.keys():
if filename.endswith(ext):
# TODO
# fformat = self._format_extensions[ext]
fformat = _format_extensions[ext]
break
if fformat is not None and fformat in format_priority:
format_priority.remove(fformat)
try:
# TODO
# gf = self._load_func_dict[fformat](filename, **kwargs)
gf = _load_func_dict[fformat](filename, **kwargs)
print("Successfully saved to {}".format(fformat))
return gf
except ImportError:
print(
"Could not load from {} format. Trying alternatives.".format(
fformat
)
)
for form in format_priority:
print("Trying {}".format(form))
try:
# TODO
# gf = self._load_func_dict[form](filename, **kwargs)
gf = _load_func_dict[form](filename, **kwargs)
print("Sucessfully loaded from {}".format(form))
return gf
except ImportError:
print("Could not load from {} format.".format(form))
raise IOError(
"Could not parse {} with the available formats. Make sure you have the necessary dependencies installed.".format(
filename
)
)

def save(self, filename, fileformat=None, **kwargs):
format_priority = ["hdf", "pickle", "csv", "excel"]
fformat = fileformat
if fformat is None:
# TODO
# for ext in self._format_extensions.keys():
for ext in _format_extensions.keys():
if filename.endswith(ext):
# TODO
# fformat = self._format_extensions[ext]
fformat = _format_extensions[ext]
break
if fformat is not None and fformat in format_priority:
format_priority.remove(fformat)
try:
# TODO
# self._save_func_dict[fformat](self, filename, **kwargs)
_save_func_dict[fformat](self, filename, **kwargs)
print("Successfully saved to {}".format(fformat))
return
except ImportError:
print(
"Could not save to {} format. Trying alternatives.".format(fformat)
)
for form in format_priority:
print("Trying {}".format(form))
try:
# TODO
# self._save_func_dict[form](self, filename, **kwargs)
_save_func_dict[form](self, filename, **kwargs)
print("Successfully saved to {}".format(form))
return
except ImportError:
print("Could not save to {} format.".format(form))
raise IOError(
"Could not save {} with the available formats. Make sure you have the necessary dependencies installed.".format(
filename
)
)

@staticmethod
def from_hdf(filename, key=None, **kwargs):
# import this lazily to avoid circular dependencies
from .readers.hdf5_reader import HDF5Reader

return HDF5Reader(filename).read(**kwargs)
return HDF5Reader(filename).read(key=key, **kwargs)

def to_hdf(self, filename, key="hatchet_graphframe", **kwargs):
# import this lazily to avoid circular dependencies
from .writers.hdf5_writer import HDF5Writer

HDF5Writer(filename).write(self, key=key, **kwargs)

@staticmethod
def from_pickle(filename, **kwargs):
from .readers.pickle_reader import PickleReader

return PickleReader(filename).read(**kwargs)

def to_pickle(self, filename, **kwargs):
from .writers.pickle_writer import PickleWriter

PickleWriter(filename).write(self, **kwargs)

@staticmethod
def from_csv(filename, **kwargs):
from .readers.csv_reader import CSVReader

return CSVReader(filename).read(**kwargs)

def to_csv(self, filename, **kwargs):
from .writers.csv_writer import CSVWriter

CSVWriter(filename).write(self, **kwargs)

@staticmethod
def from_excel(filename, **kwargs):
from .readers.excel_reader import ExcelReader

return ExcelReader(filename).read(**kwargs)

def to_excel(self, filename, **kwargs):
from .writers.excel_writer import ExcelWriter

ExcelWriter(filename).write(self, **kwargs)

def copy(self):
"""Return a shallow copy of the graphframe.

Expand Down Expand Up @@ -1362,6 +1496,20 @@ def __imul__(self, other):
return self._operator(other_copy, self.dataframe.mul)


_load_func_dict = {
"hdf": GraphFrame.from_hdf,
"csv": GraphFrame.from_csv,
"excel": GraphFrame.from_excel,
"pickle": GraphFrame.from_pickle,
}
_save_func_dict = {
"hdf": GraphFrame.to_hdf,
"csv": GraphFrame.to_csv,
"excel": GraphFrame.to_excel,
"pickle": GraphFrame.to_pickle,
}


class InvalidFilter(Exception):
"""Raised when an invalid argument is passed to the filter function."""

Expand Down
71 changes: 71 additions & 0 deletions hatchet/readers/csv_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Copyright 2017-2021 Lawrence Livermore National Security, LLC and other
# Hatchet Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT
from __future__ import unicode_literals

from ast import literal_eval
import pandas as pd
from .dataframe_reader import DataframeReader

import pickle
import sys


def _unpickle_series_elems(pd_series):
# unpickled_elems = [pickle.loads(e.encode("utf-8")) for e in pd_series]
unpickled_elems = []
for e in pd_series:
e_bytes = e
print(sys.version_info)
print(sys.version_info >= (3,))
if sys.version_info >= (3,):
e_bytes = literal_eval(e)
unpickled_elems.append(pickle.loads(e_bytes))
return pd.Series(unpickled_elems)


def _correct_children_and_parent_col_types(df):
new_children_col = []
for c in df["children"]:
if not isinstance(c, list):
new_val = literal_eval(c)
new_children_col.append(new_val)
else:
new_children_col.append(c)
df["children"] = pd.Series(new_children_col)
new_parent_col = []
for p in df["parents"]:
if not isinstance(p, list):
new_val = literal_eval(p)
new_parent_col.append(new_val)
else:
new_parent_col.append(p)
df["parents"] = pd.Series(new_parent_col)
return df


class CSVReader(DataframeReader):
def __init__(self, filename):
if sys.version_info[0] == 2:
super(CSVReader, self).__init__(filename)
else:
super().__init__(filename)

def _read_dataframe_from_file(self, **kwargs):
index_col = None
if "index_col" in kwargs:
index_col = kwargs["index_col"]
del kwargs["index_col"]
csv_df = pd.read_csv(self.filename, index_col=0, **kwargs)
csv_df["node"] = _unpickle_series_elems(csv_df["node"])
csv_df = _correct_children_and_parent_col_types(csv_df)
if index_col is not None:
return csv_df.reset_index(drop=True).set_index(index_col)
multindex_cols = ["node", "rank", "thread"]
while len(multindex_cols) > 0:
if set(multindex_cols).issubset(csv_df.columns):
return csv_df.reset_index(drop=True).set_index(multindex_cols)
multindex_cols.pop()
# TODO Replace with a custom error
raise RuntimeError("Could not generate a valid Index or MultiIndex")
67 changes: 67 additions & 0 deletions hatchet/readers/excel_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Copyright 2017-2021 Lawrence Livermore National Security, LLC and other
# Hatchet Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT

from ast import literal_eval
import pandas as pd
from .dataframe_reader import DataframeReader

import pickle
import sys


def _unpickle_series_elems(pd_series):
unpickled_elems = []
for e in pd_series:
e_bytes = e
if sys.version_info >= (3,):
e_bytes = literal_eval(e)
unpickled_elems.append(pickle.loads(e_bytes))
return pd.Series(unpickled_elems)


def _corrrect_children_and_parent_col_types(df):
new_children_col = []
for c in df["children"]:
if not isinstance(c, list):
new_val = literal_eval(c)
new_children_col.append(new_val)
else:
new_children_col.append(c)
df["children"] = pd.Series(new_children_col)
new_parent_col = []
for p in df["parents"]:
if not isinstance(p, list):
new_val = literal_eval(p)
new_parent_col.append(new_val)
else:
new_parent_col.append(p)
df["parents"] = pd.Series(new_parent_col)
return df


class ExcelReader(DataframeReader):
def __init__(self, filename):
if sys.version_info[0] == 2:
super(ExcelReader, self).__init__(filename)
else:
super().__init__(filename)

def _read_dataframe_from_file(self, **kwargs):
index_col = None
if "index_col" in kwargs:
index_col = kwargs["index_col"]
del kwargs["index_col"]
csv_df = pd.read_excel(self.filename, index_col=0, **kwargs)
csv_df["node"] = _unpickle_series_elems(csv_df["node"])
csv_df = _corrrect_children_and_parent_col_types(csv_df)
if index_col is not None:
return csv_df.reset_index(drop=True).set_index(index_col)
multindex_cols = ["node", "rank", "thread"]
while len(multindex_cols) > 0:
if set(multindex_cols).issubset(csv_df.columns):
return csv_df.reset_index(drop=True).set_index(multindex_cols)
multindex_cols.pop()
# TODO Replace with a custom error
raise RuntimeError("Could not generate a valid Index or MultiIndex")
1 change: 0 additions & 1 deletion hatchet/readers/hdf5_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

class HDF5Reader(DataframeReader):
def __init__(self, filename):
# TODO Remove Arguments when Python 2.7 support is dropped
if sys.version_info[0] == 2:
super(HDF5Reader, self).__init__(filename)
else:
Expand Down
20 changes: 20 additions & 0 deletions hatchet/readers/pickle_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Copyright 2017-2021 Lawrence Livermore National Security, LLC and other
# Hatchet Project Developers. See the top-level LICENSE file for details.
#
# SPDX-License-Identifier: MIT

import pandas as pd
from .dataframe_reader import DataframeReader

import sys


class PickleReader(DataframeReader):
def __init__(self, filename):
if sys.version_info[0] == 2:
super(PickleReader, self).__init__(filename)
else:
super().__init__(filename)

def _read_dataframe_from_file(self, **kwargs):
return pd.read_pickle(self.filename, **kwargs)
Loading