Skip to content

Sktime and Pyspi Integration #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 112 additions & 130 deletions pyspi/base.py
Original file line number Diff line number Diff line change
@@ -1,150 +1,132 @@
# Refactored code for base.py for creating a unified public API for the PySPI library.

# Importing the libraries
import os
import pandas as pd
import numpy as np

from skbase import BaseObject
from pyspi.data import Data
import warnings, copy

"""
Some parsing functions for decorating so that we can either input the time series directly or use the data structure
"""
def parse_univariate(function):
def parsed_function(self,data,i=None,inplace=True):
if not isinstance(data,Data):
data1 = data
data = Data(data=data1)
elif not inplace:
# Ensure we don't write over the original
data = copy.deepcopy(data)

if i is None:
if data.n_processes == 1:
i = 0
else:
raise ValueError('Require argument i to be set.')

return function(self,data,i=i)

return parsed_function

def parse_bivariate(function):
def parsed_function(self,data,data2=None,i=None,j=None,inplace=True):
if not isinstance(data,Data):
if data2 is None:
raise TypeError('Input must be either a pyspi.data object or two 1D-array inputs.'
f' Received {type(data)} and {type(data2)}.')
data1 = data
data = Data()
data.add_process(data1)
data.add_process(data2)
elif not inplace:
# Ensure we don't write over the original
data = copy.deepcopy(data)

if i is None and j is None:
if data.n_processes == 2:
i,j = 0,1
else:
Warning('i and j not set.')

return function(self,data,i=i,j=j)

return parsed_function

def parse_multivariate(function):
def parsed_function(self,data,inplace=True):
if not isinstance(data,Data):
# Create a pyspi.Data object from iterable data object
try:
procs = data
data = Data()
for p in procs:
data.add_process(p)
except IndexError:
raise TypeError('Data must be either a pyspi.data.Data object or an and iterable of numpy.ndarray''s.')
elif not inplace:
# Ensure we don't write over the original
data = copy.deepcopy(data)

return function(self,data)

return parsed_function

class Directed:
""" Base class for directed statistics
# Base Class
class BaseSPI(BaseObject):
"""

name = 'Bivariate base class'
identifier = 'bivariate_base'
labels = ['signed']

@parse_bivariate
def bivariate(self,data,i=None,j=None):
""" Overload method for getting the pairwise dependencies
"""
raise NotImplementedError("Method not yet overloaded.")

@parse_multivariate
def multivariate(self,data):
""" Compute the dependency statistics for the entire multivariate dataset
"""
A = np.empty((data.n_processes,data.n_processes))
A[:] = np.nan

for j in range(data.n_processes):
for i in [ii for ii in range(data.n_processes) if ii != j]:
A[i,j] = self.bivariate(data,i=i,j=j)
return A

def get_group(self,classes):
for i, i_cls in enumerate(classes):
for j, j_cls in enumerate(classes):
if i == j:
continue
assert not set(i_cls).issubset(set(j_cls)), (f'Class {i_cls} is a subset of class {j_cls}.')

self._group = None
self._group_name = None

labset = set(self.labels)
Base class for PySPI. This class provides a unified public API for the PySPI library.
"""
_tags = {
"capability-multivariate": True,
"capability-univariate": True,
"capability-bivariate": True,
"python_dependencies": "sktime",
"issigned": True,
"identifier": "base",
"name": "BasePySPI",
}

# so now using these tags, we dont need to separately handle the parsing of
# univariate, bivariate and multivariate data
# better add a deprecation warning here

# defining the methods

def _spi(self, data: Data, i: int = 0) -> float:
raise NotImplementedError("Subclass must implement this methos")

def spi(self, data, i=None):
if not isinstance(data, Data):
data = Data(data)
if i is None:
i = 0
return self._spi(data, i)

def _spi_mat(self, data: Data, data2: Data = None, i: int = None, j: int = None) -> np.ndarray:
raise NotImplementedError("Subclass must implement this method")

def spi_mat(self, data: Data, data2: Data = None, i=None, j=None):
# logic yet to be implemented
if not isinstance(data, Data):
data = Data(data)
if data2 is not None and not isinstance(data2, Data):
data2 = Data(data2)
return self._spi_mat(data, data2, i, j)

def get_group(self, classes):
warnings.warn(
"The 'get_group' method is deprecated. Use skbase's tagging system instead.",
DeprecationWarning,
)
labset = set(self.get_tags()["labels"])
matches = [set(cls).issubset(labset) for cls in classes]

if np.count_nonzero(matches) > 1:
warnings.warn(f'More than one match for classes {classes}')
else:
warnings.warn(f"More than one match for classes {classes}")
elif np.count_nonzero(matches) == 1:
try:
id = np.where(matches)[0][0]
self._group = id
self._group_name = ', '.join(classes[id])
return self._group, self._group_name
except (TypeError,IndexError):
idx = np.where(matches)[0][0]
return idx, ", ".join(classes[idx])
except (TypeError, IndexError):
pass
return None

class Undirected(Directed):
""" Base class for directed statistics
"""
class SignedSPI(BaseSPI):
_tags = {
"capability-signed": True
}

name = 'Base class'
identifier = 'base'
labels = ['unsigned']
class DirectedSPI(BaseSPI):
_tags = {
"capability-directed": True
}

def ispositive(self):
return False
def _spi_mat(self, data, data2 = None, i = None, j = None) -> np.ndarray:
n_processes1 = data.n_processes
n_processes2 = data2.n_processes if data2 is not None else n_processes1
A = np.empty((n_processes1, n_processes2))
A[:] = np.nan

@parse_multivariate
def multivariate(self,data):
A = super(Undirected,self).multivariate(data)
for col in range(n_processes2):
for row in range(n_processes1):
if row != col: # Typically directed measures are off-diagonal
A[row, col] = self._compute_directed_pair(data, data2, row, col) # Placeholder
if i is not None:
A = A[i, :]
if j is not None:
A = A[:, j]
return A

def compute_directed_pair(self, data, data2, i, j):
# Placeholder for the actual computation
raise NotImplementedError("_compute_directed_pair must be implemented.")

class UndirectedSPI(BaseSPI):
_tags = {
"capability:multivariate": True,
}

def _spi_mat(self, data, data2 = None, i = None, j = None) -> np.ndarray:
n_processes1 = data.n_processes
n_processes2 = data2.n_processes if data2 is not None else n_processes1
A = np.empty((n_processes1, n_processes2))
A[:] = np.nan

for col in range(n_processes2):
for row in range(n_processes1):
A[row, col] = self._compute_undirected_pair(data, data2, row, col) # Placeholder

li = np.tril_indices(data.n_processes,-1)
# Ensure symmetry for undirected measures
li = np.tril_indices(n_processes1, -1)
A[li] = A.T[li]
if i is not None:
A = A[i, :]
if j is not None:
A = A[:, j]
return A

def compute_undirected_pair(self, data, data2, i, j):
# Placeholder for the actual computation
raise NotImplementedError("_compute_undirected_pair must be implemented.")



class Signed:
""" Base class for signed SPIs
"""
def issigned(self):
return True

class Unsigned:
""" Base class for unsigned SPIs
"""
def issigned(self):
return False
Loading