diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index 10e6809b..18f9baf9 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -45,7 +45,10 @@ jobs: python -m pip install --upgrade pip flake8 pytest pip install -r requirements.txt # Optional Dependency for HDF Checkpointing - pip install tables + python -m pip install tables + # All Possible Optional Dependencies for Excel Checkpointing + # python -m pip install XlsxWriter openpyxl pyxlsb xlrd xlwt + python -m pip install xlrd xlwt python setup.py install python setup.py build_ext --inplace python -m pip list diff --git a/hatchet/graphframe.py b/hatchet/graphframe.py index 4f3e3f85..5e722370 100644 --- a/hatchet/graphframe.py +++ b/hatchet/graphframe.py @@ -40,6 +40,24 @@ def parallel_apply(filter_function, dataframe, queue): queue.put(filtered_df) +# TODO Move into global configuration when Connor's PR is merged +_format_extensions = { + ".hdf5": "hdf", + ".hdf": "hdf", + ".h5": "hdf", + ".csv": "csv", + ".xls": "excel", + ".xlsx": "excel", + ".xlsm": "excel", + ".xlsb": "excel", + ".odf": "excel", + ".ods": "excel", + ".odt": "excel", + ".pkl": "pickle", + ".pickle": "pickle", +} + + class GraphFrame: """An input dataset is read into an object of this type, which includes a graph and a dataframe. @@ -299,11 +317,94 @@ def from_lists(*lists): return gf @staticmethod - def from_hdf(filename, **kwargs): + def load(filename, fileformat=None, **kwargs): + format_priority = ["hdf", "pickle", "csv", "excel"] + fformat = fileformat + if fformat is None: + # TODO + # for ext in self._format_extensions.keys(): + for ext in _format_extensions.keys(): + if filename.endswith(ext): + # TODO + # fformat = self._format_extensions[ext] + fformat = _format_extensions[ext] + break + if fformat is not None and fformat in format_priority: + format_priority.remove(fformat) + try: + # TODO + # gf = self._load_func_dict[fformat](filename, **kwargs) + gf = _load_func_dict[fformat](filename, **kwargs) + print("Successfully saved to {}".format(fformat)) + return gf + except ImportError: + print( + "Could not load from {} format. Trying alternatives.".format( + fformat + ) + ) + for form in format_priority: + print("Trying {}".format(form)) + try: + # TODO + # gf = self._load_func_dict[form](filename, **kwargs) + gf = _load_func_dict[form](filename, **kwargs) + print("Sucessfully loaded from {}".format(form)) + return gf + except ImportError: + print("Could not load from {} format.".format(form)) + raise IOError( + "Could not parse {} with the available formats. Make sure you have the necessary dependencies installed.".format( + filename + ) + ) + + def save(self, filename, fileformat=None, **kwargs): + format_priority = ["hdf", "pickle", "csv", "excel"] + fformat = fileformat + if fformat is None: + # TODO + # for ext in self._format_extensions.keys(): + for ext in _format_extensions.keys(): + if filename.endswith(ext): + # TODO + # fformat = self._format_extensions[ext] + fformat = _format_extensions[ext] + break + if fformat is not None and fformat in format_priority: + format_priority.remove(fformat) + try: + # TODO + # self._save_func_dict[fformat](self, filename, **kwargs) + _save_func_dict[fformat](self, filename, **kwargs) + print("Successfully saved to {}".format(fformat)) + return + except ImportError: + print( + "Could not save to {} format. Trying alternatives.".format(fformat) + ) + for form in format_priority: + print("Trying {}".format(form)) + try: + # TODO + # self._save_func_dict[form](self, filename, **kwargs) + _save_func_dict[form](self, filename, **kwargs) + print("Successfully saved to {}".format(form)) + return + except ImportError: + print("Could not save to {} format.".format(form)) + raise IOError( + "Could not save {} with the available formats. Make sure you have the necessary dependencies installed.".format( + filename + ) + ) + + @staticmethod + def from_hdf(filename, key=None, **kwargs): # import this lazily to avoid circular dependencies from .readers.hdf5_reader import HDF5Reader - return HDF5Reader(filename).read(**kwargs) + return HDF5Reader(filename).read(key=key, **kwargs) def to_hdf(self, filename, key="hatchet_graphframe", **kwargs): # import this lazily to avoid circular dependencies @@ -311,6 +412,39 @@ def to_hdf(self, filename, key="hatchet_graphframe", **kwargs): HDF5Writer(filename).write(self, key=key, **kwargs) + @staticmethod + def from_pickle(filename, **kwargs): + from .readers.pickle_reader import PickleReader + + return PickleReader(filename).read(**kwargs) + + def to_pickle(self, filename, **kwargs): + from .writers.pickle_writer import PickleWriter + + PickleWriter(filename).write(self, **kwargs) + + @staticmethod + def from_csv(filename, **kwargs): + from .readers.csv_reader import CSVReader + + return CSVReader(filename).read(**kwargs) + + def to_csv(self, filename, **kwargs): + from .writers.csv_writer import CSVWriter + + CSVWriter(filename).write(self, **kwargs) + + @staticmethod + def from_excel(filename, **kwargs): + from .readers.excel_reader import ExcelReader + + return ExcelReader(filename).read(**kwargs) + + def to_excel(self, filename, **kwargs): + from .writers.excel_writer import ExcelWriter + + ExcelWriter(filename).write(self, **kwargs) + def copy(self): """Return a shallow copy of the graphframe. @@ -1362,6 +1496,20 @@ def __imul__(self, other): return self._operator(other_copy, self.dataframe.mul) +_load_func_dict = { + "hdf": GraphFrame.from_hdf, + "csv": GraphFrame.from_csv, + "excel": GraphFrame.from_excel, + "pickle": GraphFrame.from_pickle, +} +_save_func_dict = { + "hdf": GraphFrame.to_hdf, + "csv": GraphFrame.to_csv, + "excel": GraphFrame.to_excel, + "pickle": GraphFrame.to_pickle, +} + + class InvalidFilter(Exception): """Raised when an invalid argument is passed to the filter function.""" diff --git a/hatchet/readers/csv_reader.py b/hatchet/readers/csv_reader.py new file mode 100644 index 00000000..52d3b2a2 --- /dev/null +++ b/hatchet/readers/csv_reader.py @@ -0,0 +1,71 @@ +# Copyright 2017-2021 Lawrence Livermore National Security, LLC and other +# Hatchet Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT +from __future__ import unicode_literals + +from ast import literal_eval +import pandas as pd +from .dataframe_reader import DataframeReader + +import pickle +import sys + + +def _unpickle_series_elems(pd_series): + # unpickled_elems = [pickle.loads(e.encode("utf-8")) for e in pd_series] + unpickled_elems = [] + for e in pd_series: + e_bytes = e + print(sys.version_info) + print(sys.version_info >= (3,)) + if sys.version_info >= (3,): + e_bytes = literal_eval(e) + unpickled_elems.append(pickle.loads(e_bytes)) + return pd.Series(unpickled_elems) + + +def _correct_children_and_parent_col_types(df): + new_children_col = [] + for c in df["children"]: + if not isinstance(c, list): + new_val = literal_eval(c) + new_children_col.append(new_val) + else: + new_children_col.append(c) + df["children"] = pd.Series(new_children_col) + new_parent_col = [] + for p in df["parents"]: + if not isinstance(p, list): + new_val = literal_eval(p) + new_parent_col.append(new_val) + else: + new_parent_col.append(p) + df["parents"] = pd.Series(new_parent_col) + return df + + +class CSVReader(DataframeReader): + def __init__(self, filename): + if sys.version_info[0] == 2: + super(CSVReader, self).__init__(filename) + else: + super().__init__(filename) + + def _read_dataframe_from_file(self, **kwargs): + index_col = None + if "index_col" in kwargs: + index_col = kwargs["index_col"] + del kwargs["index_col"] + csv_df = pd.read_csv(self.filename, index_col=0, **kwargs) + csv_df["node"] = _unpickle_series_elems(csv_df["node"]) + csv_df = _correct_children_and_parent_col_types(csv_df) + if index_col is not None: + return csv_df.reset_index(drop=True).set_index(index_col) + multindex_cols = ["node", "rank", "thread"] + while len(multindex_cols) > 0: + if set(multindex_cols).issubset(csv_df.columns): + return csv_df.reset_index(drop=True).set_index(multindex_cols) + multindex_cols.pop() + # TODO Replace with a custom error + raise RuntimeError("Could not generate a valid Index or MultiIndex") diff --git a/hatchet/readers/excel_reader.py b/hatchet/readers/excel_reader.py new file mode 100644 index 00000000..13c79779 --- /dev/null +++ b/hatchet/readers/excel_reader.py @@ -0,0 +1,67 @@ +# Copyright 2017-2021 Lawrence Livermore National Security, LLC and other +# Hatchet Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +from ast import literal_eval +import pandas as pd +from .dataframe_reader import DataframeReader + +import pickle +import sys + + +def _unpickle_series_elems(pd_series): + unpickled_elems = [] + for e in pd_series: + e_bytes = e + if sys.version_info >= (3,): + e_bytes = literal_eval(e) + unpickled_elems.append(pickle.loads(e_bytes)) + return pd.Series(unpickled_elems) + + +def _corrrect_children_and_parent_col_types(df): + new_children_col = [] + for c in df["children"]: + if not isinstance(c, list): + new_val = literal_eval(c) + new_children_col.append(new_val) + else: + new_children_col.append(c) + df["children"] = pd.Series(new_children_col) + new_parent_col = [] + for p in df["parents"]: + if not isinstance(p, list): + new_val = literal_eval(p) + new_parent_col.append(new_val) + else: + new_parent_col.append(p) + df["parents"] = pd.Series(new_parent_col) + return df + + +class ExcelReader(DataframeReader): + def __init__(self, filename): + if sys.version_info[0] == 2: + super(ExcelReader, self).__init__(filename) + else: + super().__init__(filename) + + def _read_dataframe_from_file(self, **kwargs): + index_col = None + if "index_col" in kwargs: + index_col = kwargs["index_col"] + del kwargs["index_col"] + csv_df = pd.read_excel(self.filename, index_col=0, **kwargs) + csv_df["node"] = _unpickle_series_elems(csv_df["node"]) + csv_df = _corrrect_children_and_parent_col_types(csv_df) + if index_col is not None: + return csv_df.reset_index(drop=True).set_index(index_col) + multindex_cols = ["node", "rank", "thread"] + while len(multindex_cols) > 0: + if set(multindex_cols).issubset(csv_df.columns): + return csv_df.reset_index(drop=True).set_index(multindex_cols) + multindex_cols.pop() + # TODO Replace with a custom error + raise RuntimeError("Could not generate a valid Index or MultiIndex") diff --git a/hatchet/readers/hdf5_reader.py b/hatchet/readers/hdf5_reader.py index c25f2322..c3e2df75 100644 --- a/hatchet/readers/hdf5_reader.py +++ b/hatchet/readers/hdf5_reader.py @@ -11,7 +11,6 @@ class HDF5Reader(DataframeReader): def __init__(self, filename): - # TODO Remove Arguments when Python 2.7 support is dropped if sys.version_info[0] == 2: super(HDF5Reader, self).__init__(filename) else: diff --git a/hatchet/readers/pickle_reader.py b/hatchet/readers/pickle_reader.py new file mode 100644 index 00000000..ca516c5b --- /dev/null +++ b/hatchet/readers/pickle_reader.py @@ -0,0 +1,20 @@ +# Copyright 2017-2021 Lawrence Livermore National Security, LLC and other +# Hatchet Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +import pandas as pd +from .dataframe_reader import DataframeReader + +import sys + + +class PickleReader(DataframeReader): + def __init__(self, filename): + if sys.version_info[0] == 2: + super(PickleReader, self).__init__(filename) + else: + super().__init__(filename) + + def _read_dataframe_from_file(self, **kwargs): + return pd.read_pickle(self.filename, **kwargs) diff --git a/hatchet/tests/graphframe.py b/hatchet/tests/graphframe.py index 11ef42eb..3eda23d0 100644 --- a/hatchet/tests/graphframe.py +++ b/hatchet/tests/graphframe.py @@ -1176,3 +1176,130 @@ def test_hdf_load_store(mock_graph_literal): if os.path.exists("test_gframe.hdf"): os.remove("test_gframe.hdf") + + +def test_pickle_load_store(mock_graph_literal): + if os.path.exists("test_gframe.pkl"): + os.remove("test_gframe.pkl") + gf_orig = GraphFrame.from_literal(mock_graph_literal) + gf_orig.to_pickle("test_gframe.pkl") + gf_loaded = GraphFrame.from_pickle("test_gframe.pkl") + + assert gf_orig.dataframe.equals(gf_loaded.dataframe) + assert gf_orig.graph == gf_loaded.graph + + if os.path.exists("test_gframe.pkl"): + os.remove("test_gframe.pkl") + + +def test_csv_load_store(mock_graph_literal): + if os.path.exists("test_gframe.csv"): + os.remove("test_gframe.csv") + gf_orig = GraphFrame.from_literal(mock_graph_literal) + gf_orig.to_csv("test_gframe.csv") + gf_loaded = GraphFrame.from_csv("test_gframe.csv") + + assert gf_orig.dataframe.equals(gf_loaded.dataframe) + assert gf_orig.graph == gf_loaded.graph + + if os.path.exists("test_gframe.csv"): + os.remove("test_gframe.csv") + + +def test_excel_load_store(mock_graph_literal): + if os.path.exists("test_gframe.xls"): + os.remove("test_gframe.xls") + gf_orig = GraphFrame.from_literal(mock_graph_literal) + gf_orig.to_excel("test_gframe.xls") + gf_loaded = GraphFrame.from_excel("test_gframe.xls") + + # Excel will convert integers represented as floats back into integers. + # To ensure "equals" evaluates correctly, I manually cast the "time" and "time (inc)" + # columns back to float + gf_loaded.dataframe["time"] = gf_loaded.dataframe["time"].astype( + gf_orig.dataframe.dtypes["time"] + ) + gf_loaded.dataframe["time (inc)"] = gf_loaded.dataframe["time (inc)"].astype( + gf_orig.dataframe.dtypes["time (inc)"] + ) + + assert gf_orig.dataframe.equals(gf_loaded.dataframe) + assert gf_orig.graph == gf_loaded.graph + + if os.path.exists("test_gframe.xls"): + os.remove("test_gframe.xls") + + +def test_save_func_w_extension(mock_graph_literal): + fname = "test_gframe.hdf" + if os.path.exists(fname): + os.remove(fname) + gf_orig = GraphFrame.from_literal(mock_graph_literal) + gf_orig.save(fname) + gf_loaded = GraphFrame.from_hdf(fname) + + assert gf_orig.dataframe.equals(gf_loaded.dataframe) + assert gf_orig.graph == gf_loaded.graph + + if os.path.exists(fname): + os.remove(fname) + + +def test_save_func_w_manual_format(mock_graph_literal): + fname = "test_gframe" + if os.path.exists(fname): + os.remove(fname) + gf_orig = GraphFrame.from_literal(mock_graph_literal) + gf_orig.save(fname, fileformat="hdf") + gf_loaded = GraphFrame.from_hdf(fname) + + assert gf_orig.dataframe.equals(gf_loaded.dataframe) + assert gf_orig.graph == gf_loaded.graph + + if os.path.exists(fname): + os.remove(fname) + + +def test_load_func_w_extension(mock_graph_literal): + fname = "test_gframe.pkl" + if os.path.exists(fname): + os.remove(fname) + gf_orig = GraphFrame.from_literal(mock_graph_literal) + gf_orig.to_pickle(fname) + gf_loaded = GraphFrame.load(fname) + + assert gf_orig.dataframe.equals(gf_loaded.dataframe) + assert gf_orig.graph == gf_loaded.graph + + if os.path.exists(fname): + os.remove(fname) + + +def test_load_func_w_manual_format(mock_graph_literal): + fname = "test_gframe" + if os.path.exists(fname): + os.remove(fname) + gf_orig = GraphFrame.from_literal(mock_graph_literal) + gf_orig.to_hdf(fname) + gf_loaded = GraphFrame.load(fname, fileformat="hdf") + + assert gf_orig.dataframe.equals(gf_loaded.dataframe) + assert gf_orig.graph == gf_loaded.graph + + if os.path.exists(fname): + os.remove(fname) + + +def test_save_load_func_w_guessing_format(mock_graph_literal): + fname = "test_gframe" + if os.path.exists(fname): + os.remove(fname) + gf_orig = GraphFrame.from_literal(mock_graph_literal) + gf_orig.save(fname) + gf_loaded = GraphFrame.load(fname) + + assert gf_orig.dataframe.equals(gf_loaded.dataframe) + assert gf_orig.graph == gf_loaded.graph + + if os.path.exists(fname): + os.remove(fname) diff --git a/hatchet/writers/csv_writer.py b/hatchet/writers/csv_writer.py new file mode 100644 index 00000000..3e216b28 --- /dev/null +++ b/hatchet/writers/csv_writer.py @@ -0,0 +1,31 @@ +# Copyright 2017-2021 Lawrence Livermore National Security, LLC and other +# Hatchet Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +from .dataframe_writer import DataframeWriter + +import pandas as pd +import pickle + +import sys + + +def pickle_series_elems(pd_series): + pickled_elems = [pickle.dumps(e) for e in pd_series] + return pd.Series(pickled_elems) + + +class CSVWriter(DataframeWriter): + def __init__(self, filename): + if sys.version_info[0] == 2: + super(CSVWriter, self).__init__(filename) + else: + super().__init__(filename) + + def _write_dataframe_to_file(self, df, **kwargs): + df.reset_index(inplace=True) + df["node"] = pickle_series_elems(df["node"]) + df["children"] = df["children"].apply(str, convert_dtype=True) + df["parents"] = df["parents"].apply(str, convert_dtype=True) + df.to_csv(self.filename, **kwargs) diff --git a/hatchet/writers/excel_writer.py b/hatchet/writers/excel_writer.py new file mode 100644 index 00000000..4e22a4d0 --- /dev/null +++ b/hatchet/writers/excel_writer.py @@ -0,0 +1,29 @@ +# Copyright 2017-2021 Lawrence Livermore National Security, LLC and other +# Hatchet Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +from .dataframe_writer import DataframeWriter + +import pandas as pd +import pickle + +import sys + + +def pickle_series_elems(pd_series): + pickled_elems = [pickle.dumps(e) for e in pd_series] + return pd.Series(pickled_elems) + + +class ExcelWriter(DataframeWriter): + def __init__(self, filename): + if sys.version_info[0] == 2: + super(ExcelWriter, self).__init__(filename) + else: + super().__init__(filename) + + def _write_dataframe_to_file(self, df, **kwargs): + df.reset_index(inplace=True) + df["node"] = pickle_series_elems(df["node"]) + df.to_excel(self.filename, **kwargs) diff --git a/hatchet/writers/hdf5_writer.py b/hatchet/writers/hdf5_writer.py index 24250116..014df583 100644 --- a/hatchet/writers/hdf5_writer.py +++ b/hatchet/writers/hdf5_writer.py @@ -17,10 +17,6 @@ def __init__(self, filename): super().__init__(filename) def _write_dataframe_to_file(self, df, **kwargs): - if "key" not in kwargs: - raise KeyError("Writing to HDF5 requires a user-supplied key") - key = kwargs["key"] - del kwargs["key"] with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=Warning) - df.to_hdf(self.filename, key, **kwargs) + df.to_hdf(self.filename, **kwargs) diff --git a/hatchet/writers/pickle_writer.py b/hatchet/writers/pickle_writer.py new file mode 100644 index 00000000..85d8765c --- /dev/null +++ b/hatchet/writers/pickle_writer.py @@ -0,0 +1,19 @@ +# Copyright 2017-2021 Lawrence Livermore National Security, LLC and other +# Hatchet Project Developers. See the top-level LICENSE file for details. +# +# SPDX-License-Identifier: MIT + +from .dataframe_writer import DataframeWriter + +import sys + + +class PickleWriter(DataframeWriter): + def __init__(self, filename): + if sys.version_info[0] == 2: + super(PickleWriter, self).__init__(filename) + else: + super().__init__(filename) + + def _write_dataframe_to_file(self, df, **kwargs): + df.to_pickle(self.filename, **kwargs)