diff --git a/README.rst b/README.rst index 4a15556..d5fd12c 100644 --- a/README.rst +++ b/README.rst @@ -19,7 +19,7 @@ Installation timeseriescv can be installed using pip: - >>> pip install timeseriescv + >>> pip install git+https://github.com/pythagorea1/timeseriescv Content ~~~~~~~ @@ -29,6 +29,25 @@ For now the package contains two main classes handling cross-validation: * ``PurgedWalkForwardCV``: Walk-forward cross-validation with purging. * ``CombPurgedKFoldCV``: Combinatorial cross-validation with purging and embargoing. +Quick Start for CPCV +~~~~~~~~~~~~ + + >>> from timeseriescv.cross_validation import CombPurgedKFoldCV as CPCV + >>> cpcv = CPCV(n_splits=10, n_test_splits=2) + >>> for (train_set, test_set) in cpcv.split(df): + >>> train_X = df.iloc[train_set] + >>> test_X = df.iloc[test_set] + +Quick Start for WalkForward +~~~~~~~~~~~~ + >>> from timeseriescv.walkforward import PurgedWalkForwardCV as WF + >>> wf = WF(n_splits=10, n_test_splits=2, purge_count=10) + >>> for (train_set, test_set) in wf.split(df): + >>> train_X = df.iloc[train_set] + >>> test_X = df.iloc[test_set] + + + Remarks concerning the API ~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -36,8 +55,5 @@ The API is as similar to the scikit-learn API as possible. Like the scikit-learn method is a generator that yields a pair of numpy arrays containing the positional indices of the samples in the train and validation set, respectively. The main differences with the scikit-learn API are: -* The ``split`` method takes as arguments not only the predictor values ``X``, but also the prediction times ``pred_times`` and the evaluation times ``eval_times`` of each sample. -* To stay as close to the scikit-learn API as possible, this data is passed as separate parameters. But in order to ensure that they are properly aligned, ``X``, ``pred_times`` and ``eval_times`` are required to be pandas DataFrames/Series sharing the same index. - Check the docstrings of the cross-validation classes for more information. diff --git a/setup.py b/setup.py index d87c63c..05e6c15 100644 --- a/setup.py +++ b/setup.py @@ -15,9 +15,9 @@ def readme(): 'Topic :: Scientific/Engineering', ], keywords='machine-learning cross-validation scikit-learn time-series', - url='https://github.com/sam31415/timeseriescv', - author='Samuel Monnier', - author_email='samuel.monnier@gmail.com', + url='https://github.com/pythagorea1/timeseriescv', + author='Kakyo Okina', + author_email='kakyo@alpacadb.com', license='MIT', packages=['timeseriescv'], install_requires=[ diff --git a/timeseriescv/core.py b/timeseriescv/core.py new file mode 100644 index 0000000..1db97e4 --- /dev/null +++ b/timeseriescv/core.py @@ -0,0 +1,116 @@ +import numpy as np +import pandas as pd + +from abc import abstractmethod +from typing import Iterable, Tuple, List + +# fork from https://github.com/sam31415/timeseriescv + + +class BaseTimeSeriesCrossValidator: + """ + Abstract class for time series cross-validation. + Time series cross-validation requires each sample has a prediction time pred_time, + at which the features are used to predict the response, + and an evaluation time eval_time, at which the response is known and the error can be computed. + Importantly, it means that unlike in standard sklearn cross-validation, + the samples X, response y, + pred_times and eval_times must all be pandas dataframe/series having the same index. + It is also assumed that the + samples are time-ordered with respect to the prediction time (i.e. pred_times is non-decreasing). + + Parameters + ---------- + n_splits : int, default=10 + Number of folds. Must be at least 2. + + """ + + def __init__(self, n_splits=10): + n_splits = int(n_splits) + self.n_splits = n_splits + self.pred_times = None + self.eval_times = None + self.indices = None + + @abstractmethod + def split(self, X: pd.DataFrame): + self.indices = np.arange(X.shape[0]) + self.eval_times = pd.Series(X.index) + self.pred_times = pd.Series(X.index) + + +def embargo(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, + test_indices: np.ndarray, test_fold_end: int) -> np.ndarray: + """ + Apply the embargo procedure to part of the train set. + + This amounts to dropping the train set samples whose prediction time occurs within self.embargo_dt of the test + set sample evaluation times. This method applies the embargo only to the part of the training set immediately + following the end of the test set determined by test_fold_end. + + Parameters + ---------- + cv: Cross-validation class + Needs to have the attributes cv.pred_times, cv.eval_times, cv.embargo_dt and cv.indices. + + train_indices: np.ndarray + A numpy array containing all the indices of the samples currently included in the train set. + + test_indices : np.ndarray + A numpy array containing all the indices of the samples in the test set. + + test_fold_end : int + Index corresponding to the end of a test set block. + + Returns + ------- + train_indices: np.ndarray + The same array, with the indices subject to embargo removed. + + """ + if not hasattr(cv, 'embargo_td'): + raise ValueError("The passed cross-validation object should have a member cv.embargo_td defining the embargo" + "time.") + last_test_eval_time = cv.eval_times.iloc[test_indices[:test_fold_end]].max() + min_train_index = len(cv.pred_times[cv.pred_times <= last_test_eval_time + cv.embargo_td]) + if min_train_index < cv.indices.shape[0]: + allowed_indices = np.concatenate((cv.indices[:test_fold_end], cv.indices[min_train_index:])) + train_indices = np.intersect1d(train_indices, allowed_indices) + return train_indices + + +def purge(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, + test_fold_start: int, test_fold_end: int) -> np.ndarray: + """ + Purge part of the train set. + + Given a left boundary index test_fold_start of the test set, this method removes from the train set all the + samples whose evaluation time is posterior to the prediction time of the first test sample after the boundary. + + Parameters + ---------- + cv: Cross-validation class + Needs to have the attributes cv.pred_times, cv.eval_times and cv.indices. + + train_indices: np.ndarray + A numpy array containing all the indices of the samples currently included in the train set. + + test_fold_start : int + Index corresponding to the start of a test set block. + + test_fold_end : int + Index corresponding to the end of the same test set block. + + Returns + ------- + train_indices: np.ndarray + A numpy array containing the train indices purged at test_fold_start. + + """ + time_test_fold_start = cv.pred_times.iloc[test_fold_start] + # The train indices before the start of the test fold, purged. + train_indices_1 = np.intersect1d(train_indices, cv.indices[cv.eval_times < time_test_fold_start]) + # The train indices after the end of the test fold. + train_indices_2 = np.intersect1d(train_indices, cv.indices[test_fold_end:]) + return np.concatenate((train_indices_1, train_indices_2)) \ No newline at end of file diff --git a/timeseriescv/cross_validation.py b/timeseriescv/cross_validation.py index 3131777..cb6a866 100644 --- a/timeseriescv/cross_validation.py +++ b/timeseriescv/cross_validation.py @@ -1,20 +1,25 @@ import itertools as itt -import numbers import numpy as np import pandas as pd from abc import abstractmethod from typing import Iterable, Tuple, List +# fork from https://github.com/sam31415/timeseriescv + +D, N, U = -1, 0, 1 + class BaseTimeSeriesCrossValidator: """ Abstract class for time series cross-validation. - - Time series cross-validation requires each sample has a prediction time pred_time, at which the features are used to - predict the response, and an evaluation time eval_time, at which the response is known and the error can be - computed. Importantly, it means that unlike in standard sklearn cross-validation, the samples X, response y, - pred_times and eval_times must all be pandas dataframe/series having the same index. It is also assumed that the + Time series cross-validation requires each sample has a prediction time pred_time, + at which the features are used to predict the response, + and an evaluation time eval_time, at which the response is known and the error can be computed. + Importantly, it means that unlike in standard sklearn cross-validation, + the samples X, response y, + pred_times and eval_times must all be pandas dataframe/series having the same index. + It is also assumed that the samples are time-ordered with respect to the prediction time (i.e. pred_times is non-decreasing). Parameters @@ -23,213 +28,19 @@ class BaseTimeSeriesCrossValidator: Number of folds. Must be at least 2. """ + def __init__(self, n_splits=10): - if not isinstance(n_splits, numbers.Integral): - raise ValueError(f"The number of folds must be of Integral type. {n_splits} of type {type(n_splits)}" - f" was passed.") n_splits = int(n_splits) - if n_splits <= 1: - raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting n_splits = 2 " - f"or more, got n_splits = {n_splits}.") self.n_splits = n_splits self.pred_times = None self.eval_times = None self.indices = None @abstractmethod - def split(self, X: pd.DataFrame, y: pd.Series = None, - pred_times: pd.Series = None, eval_times: pd.Series = None): - if not isinstance(X, pd.DataFrame) and not isinstance(X, pd.Series): - raise ValueError('X should be a pandas DataFrame/Series.') - if not isinstance(y, pd.Series) and y is not None: - raise ValueError('y should be a pandas Series.') - if not isinstance(pred_times, pd.Series): - raise ValueError('pred_times should be a pandas Series.') - if not isinstance(eval_times, pd.Series): - raise ValueError('eval_times should be a pandas Series.') - if y is not None and (X.index == y.index).sum() != len(y): - raise ValueError('X and y must have the same index') - if (X.index == pred_times.index).sum() != len(pred_times): - raise ValueError('X and pred_times must have the same index') - if (X.index == eval_times.index).sum() != len(eval_times): - raise ValueError('X and eval_times must have the same index') - - self.pred_times = pred_times - self.eval_times = eval_times + def split(self, X: pd.DataFrame): self.indices = np.arange(X.shape[0]) - - -class PurgedWalkForwardCV(BaseTimeSeriesCrossValidator): - """ - Purged walk-forward cross-validation - - As described in Advances in financial machine learning, Marcos Lopez de Prado, 2018. - - The samples are decomposed into n_splits folds containing equal numbers of samples, without shuffling. In each cross - validation round, n_test_splits contiguous folds are used as the test set, while the train set consists in between - min_train_splits and max_train_splits immediately preceding folds. - - Each sample should be tagged with a prediction time pred_time and an evaluation time eval_time. The split is such - that the intervals [pred_times, eval_times] associated to samples in the train and test set do not overlap. (The - overlapping samples are dropped.) - - With split_by_times = True in the split method, it is also possible to split the samples in folds spanning equal - time intervals (using the prediction time as a time tag), instead of folds containing equal numbers of samples. - - Parameters - ---------- - n_splits : int, default=10 - Number of folds. Must be at least 2. - - n_test_splits : int, default = 1 - Number of folds used in the test set. Must be at least 1. - - min_train_splits: int, default = 2 - Minimal number of folds to be used in the train set. - - max_train_splits: int, default = None - Maximal number of folds to be used in the train set. If None, there is no upper limit. - - """ - def __init__(self, n_splits=10, n_test_splits=1, min_train_splits=2, max_train_splits=None): - super().__init__(n_splits) - if not isinstance(n_test_splits, numbers.Integral): - raise ValueError(f"The number of test folds must be of Integral type. {n_test_splits} of type " - f"{type(n_test_splits)} was passed.") - n_test_splits = int(n_test_splits) - if n_test_splits <= 0 or n_test_splits >= self.n_splits - 1: - raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting " - f"n_test_splits between 1 and n_splits - 1, got n_test_splits = {n_test_splits}.") - self.n_test_splits = n_test_splits - - if not isinstance(min_train_splits, numbers.Integral): - raise ValueError(f"The minimal number of train folds must be of Integral type. {min_train_splits} of type " - f"{type(min_train_splits)} was passed.") - min_train_splits = int(min_train_splits) - if min_train_splits <= 0 or min_train_splits >= self.n_splits - self.n_test_splits: - raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting " - f"min_train_splits between 1 and n_splits - n_test_splits, got min_train_splits = " - f"{min_train_splits}.") - self.min_train_splits = min_train_splits - - if max_train_splits is None: - max_train_splits = self.n_splits - self.n_test_splits - if not isinstance(max_train_splits, numbers.Integral): - raise ValueError(f"The maximal number of train folds must be of Integral type. {max_train_splits} of type " - f"{type(max_train_splits)} was passed.") - max_train_splits = int(max_train_splits) - if max_train_splits <= 0 or max_train_splits > self.n_splits - self.n_test_splits: - raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting " - f"max_train_split between 1 and n_splits - n_test_splits, got max_train_split = " - f"{max_train_splits}.") - self.max_train_splits = max_train_splits - self.fold_bounds = [] - - def split(self, X: pd.DataFrame, y: pd.Series = None, pred_times: pd.Series = None, eval_times: pd.Series = None, - split_by_time: bool = False) -> Iterable[Tuple[np.ndarray, np.ndarray]]: - """ - Yield the indices of the train and test sets. - - Although the samples are passed in the form of a pandas dataframe, the indices returned are position indices, - not labels. - - Parameters - ---------- - X : pd.DataFrame, shape (n_samples, n_features), required - Samples. Only used to extract n_samples. - - y : pd.Series, not used, inherited from _BaseKFold - - pred_times : pd.Series, shape (n_samples,), required - Times at which predictions are made. pred_times.index has to coincide with X.index. - - eval_times : pd.Series, shape (n_samples,), required - Times at which the response becomes available and the error can be computed. eval_times.index has to - coincide with X.index. - - split_by_time: bool - If False, the folds contain an (approximately) equal number of samples. If True, the folds span identical - time intervals. - - Returns - ------- - train_indices: np.ndarray - A numpy array containing all the indices in the train set. - - test_indices : np.ndarray - A numpy array containing all the indices in the test set. - - """ - super().split(X, y, pred_times, eval_times) - - # Fold boundaries - self.fold_bounds = compute_fold_bounds(self, split_by_time) - - count_folds = 0 - for fold_bound in self.fold_bounds: - if count_folds < self.min_train_splits: - count_folds = count_folds + 1 - continue - if self.n_splits - count_folds < self.n_test_splits: - break - # Computes the bounds of the test set, and the corresponding indices - test_indices = self.compute_test_set(fold_bound, count_folds) - # Computes the train set indices - train_indices = self.compute_train_set(fold_bound, count_folds) - - count_folds = count_folds + 1 - yield train_indices, test_indices - - def compute_train_set(self, fold_bound: int, count_folds: int) -> np.ndarray: - """ - Compute the position indices of samples in the train set. - - Parameters - ---------- - fold_bound : int - Bound between the train set and the test set. - - count_folds : int - The number (starting at 0) of the first fold in the test set. - - Returns - ------- - train_indices: np.ndarray - A numpy array containing all the indices in the train set. - - """ - if count_folds > self.max_train_splits: - start_train = self.fold_bounds[count_folds - self.max_train_splits] - else: - start_train = 0 - train_indices = np.arange(start_train, fold_bound) - # Purge - train_indices = purge(self, train_indices, fold_bound, self.indices[-1]) - return train_indices - - def compute_test_set(self, fold_bound: int, count_folds: int) -> np.ndarray: - """ - Compute the indices of the samples in the test set. - - Parameters - ---------- - fold_bound : int - Bound between the train set and the test set. - - count_folds : int - The number (starting at 0) of the first fold in the test set. - - Returns - ------- - test_indices: np.ndarray - A numpy array containing the test indices. - - """ - if self.n_splits - count_folds > self.n_test_splits: - end_test = self.fold_bounds[count_folds + self.n_test_splits] - else: - end_test = self.indices[-1] + 1 - return np.arange(fold_bound, end_test) + self.eval_times = pd.Series(X.index) + self.pred_times = pd.Series(X.index) class CombPurgedKFoldCV(BaseTimeSeriesCrossValidator): @@ -238,13 +49,15 @@ class CombPurgedKFoldCV(BaseTimeSeriesCrossValidator): As described in Advances in financial machine learning, Marcos Lopez de Prado, 2018. - The samples are decomposed into n_splits folds containing equal numbers of samples, without shuffling. In each cross - validation round, n_test_splits folds are used as the test set, while the other folds are used as the train set. + The samples are decomposed into n_splits folds containing equal numbers of samples, without shuffling. + In each cross validation round, n_test_splits folds are used as the test set, + while the other folds are used as the train set. There are as many rounds as n_test_splits folds among the n_splits folds. - Each sample should be tagged with a prediction time pred_time and an evaluation time eval_time. The split is such - that the intervals [pred_times, eval_times] associated to samples in the train and test set do not overlap. (The - overlapping samples are dropped.) In addition, an "embargo" period is defined, giving the minimal time between an + Each sample should be tagged with a prediction time pred_time and an evaluation time eval_time. + The split is such that the intervals [pred_times, eval_times] associated to samples + in the train and test set do not overlap. + (The overlapping samples are dropped.) In addition, an "embargo" period is defined, giving the minimal time between an evaluation time in the test set and a prediction time in the training set. This is to avoid, in the presence of temporal correlation, a contamination of the test set by the train set. @@ -260,30 +73,26 @@ class CombPurgedKFoldCV(BaseTimeSeriesCrossValidator): Embargo period (see explanations above). """ - def __init__(self, n_splits=10, n_test_splits=2, embargo_td=pd.Timedelta(minutes=0)): + + def __init__( + self, + n_splits=10, + n_test_splits=2, + embargo_td=pd.Timedelta(minutes=0), + embargo_before_td=pd.Timedelta(minutes=0), + ): super().__init__(n_splits) - if not isinstance(n_test_splits, numbers.Integral): - raise ValueError(f"The number of test folds must be of Integral type. {n_test_splits} of type " - f"{type(n_test_splits)} was passed.") n_test_splits = int(n_test_splits) - if n_test_splits <= 0 or n_test_splits > self.n_splits - 1: - raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting " - f"n_test_splits between 1 and n_splits - 1, got n_test_splits = {n_test_splits}.") self.n_test_splits = n_test_splits - if not isinstance(embargo_td, pd.Timedelta): - raise ValueError(f"The embargo time should be of type Pandas Timedelta. {embargo_td} of type " - f"{type(embargo_td)} was passed.") - if embargo_td < pd.Timedelta(minutes=0): - raise ValueError(f"The embargo time should be positive, got embargo = {embargo_td}.") self.embargo_td = embargo_td + self.embargo_before_td = embargo_before_td - def split(self, X: pd.DataFrame, y: pd.Series = None, - pred_times: pd.Series = None, eval_times: pd.Series = None) -> Iterable[Tuple[np.ndarray, np.ndarray]]: + def split(self, X: pd.DataFrame) -> Iterable[Tuple[np.ndarray, np.ndarray]]: """ Yield the indices of the train and test sets. - Although the samples are passed in the form of a pandas dataframe, the indices returned are position indices, - not labels. + Although the samples are passed in the form of a pandas dataframe, + the indices returned are position indices, not labels. Parameters ---------- @@ -292,12 +101,6 @@ def split(self, X: pd.DataFrame, y: pd.Series = None, y : pd.Series, not used, inherited from _BaseKFold - pred_times : pd.Series, shape (n_samples,), required - Times at which predictions are made. pred_times.index has to coincide with X.index. - - eval_times : pd.Series, shape (n_samples,), required - Times at which the response becomes available and the error can be computed. eval_times.index has to - coincide with X.index. Returns ------- @@ -308,10 +111,13 @@ def split(self, X: pd.DataFrame, y: pd.Series = None, A numpy array containing all the indices in the test set. """ - super().split(X, y, pred_times, eval_times) + super().split(X) # Fold boundaries - fold_bounds = [(fold[0], fold[-1] + 1) for fold in np.array_split(self.indices, self.n_splits)] + fold_bounds = [ + (fold[0], fold[-1] + 1) + for fold in np.array_split(self.indices, self.n_splits) + ] # List of all combinations of n_test_splits folds selected to become test sets selected_fold_bounds = list(itt.combinations(fold_bounds, self.n_test_splits)) # In order for the first round to have its whole test set at the end of the dataset @@ -325,7 +131,9 @@ def split(self, X: pd.DataFrame, y: pd.Series = None, yield train_indices, test_indices - def compute_train_set(self, test_fold_bounds: List[Tuple[int, int]], test_indices: np.ndarray) -> np.ndarray: + def compute_train_set( + self, test_fold_bounds: List[Tuple[int, int]], test_indices: np.ndarray + ) -> np.ndarray: """ Compute the position indices of samples in the train set. @@ -350,10 +158,14 @@ def compute_train_set(self, test_fold_bounds: List[Tuple[int, int]], test_indice # Purge train_indices = purge(self, train_indices, test_fold_start, test_fold_end) # Embargo - train_indices = embargo(self, train_indices, test_indices, test_fold_end) + train_indices = embargo( + self, train_indices, test_indices, test_fold_start, test_fold_end + ) return train_indices - def compute_test_set(self, fold_bound_list: List[Tuple[int, int]]) -> Tuple[List[Tuple[int, int]], np.ndarray]: + def compute_test_set( + self, fold_bound_list: List[Tuple[int, int]] + ) -> Tuple[List[Tuple[int, int]], np.ndarray]: """ Compute the indices of the samples in the test set. @@ -380,39 +192,26 @@ def compute_test_set(self, fold_bound_list: List[Tuple[int, int]]) -> Tuple[List # If the current test split is contiguous to the previous one, simply updates the endpoint elif fold_start == test_fold_bounds[-1][-1]: test_fold_bounds[-1] = (test_fold_bounds[-1][0], fold_end) - test_indices = np.union1d(test_indices, self.indices[fold_start:fold_end]).astype(int) + test_indices = np.union1d( + test_indices, self.indices[fold_start:fold_end] + ).astype(int) return test_fold_bounds, test_indices -def compute_fold_bounds(cv: BaseTimeSeriesCrossValidator, split_by_time: bool) -> List[int]: - """ - Compute a list containing the fold (left) boundaries. - - Parameters - ---------- - cv: BaseTimeSeriesCrossValidator - Cross-validation object for which the bounds need to be computed. - split_by_time: bool - If False, the folds contain an (approximately) equal number of samples. If True, the folds span identical - time intervals. - """ - if split_by_time: - full_time_span = cv.pred_times.max() - cv.pred_times.min() - fold_time_span = full_time_span / cv.n_splits - fold_bounds_times = [cv.pred_times.iloc[0] + fold_time_span * n for n in range(cv.n_splits)] - return cv.pred_times.searchsorted(fold_bounds_times) - else: - return [fold[0] for fold in np.array_split(cv.indices, cv.n_splits)] - - -def embargo(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, - test_indices: np.ndarray, test_fold_end: int) -> np.ndarray: +def embargo( + cv: BaseTimeSeriesCrossValidator, + train_indices: np.ndarray, + test_indices: np.ndarray, + test_fold_start: int, + test_fold_end: int, +) -> np.ndarray: """ Apply the embargo procedure to part of the train set. - This amounts to dropping the train set samples whose prediction time occurs within self.embargo_dt of the test - set sample evaluation times. This method applies the embargo only to the part of the training set immediately - following the end of the test set determined by test_fold_end. + This amounts to dropping the train set samples whose prediction time occurs + within self.embargo_dt of the test set sample evaluation times. This method + applies the embargo only to the part of the training set immediately following + the end of the test set determined by test_fold_end. Parameters ---------- @@ -434,24 +233,39 @@ def embargo(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, The same array, with the indices subject to embargo removed. """ - if not hasattr(cv, 'embargo_td'): - raise ValueError("The passed cross-validation object should have a member cv.embargo_td defining the embargo" - "time.") + if not hasattr(cv, "embargo_td"): + raise ValueError( + "The passed cross-validation object should have a member cv.embargo_td defining the embargo" + "time." + ) last_test_eval_time = cv.eval_times.iloc[test_indices[:test_fold_end]].max() - min_train_index = len(cv.pred_times[cv.pred_times <= last_test_eval_time + cv.embargo_td]) + first_test_eval_time = cv.eval_times.iloc[test_indices[test_fold_start:]].min() + min_train_index = len( + cv.pred_times[cv.pred_times <= last_test_eval_time + cv.embargo_td] + ) + max_train_index = len( + cv.pred_times[cv.pred_times >= first_test_eval_time - cv.embargo_before_td] + ) if min_train_index < cv.indices.shape[0]: - allowed_indices = np.concatenate((cv.indices[:test_fold_end], cv.indices[min_train_index:])) + allowed_indices = np.concatenate( + (cv.indices[:max_train_index], cv.indices[min_train_index:]) + ) train_indices = np.intersect1d(train_indices, allowed_indices) return train_indices -def purge(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, - test_fold_start: int, test_fold_end: int) -> np.ndarray: +def purge( + cv: BaseTimeSeriesCrossValidator, + train_indices: np.ndarray, + test_fold_start: int, + test_fold_end: int, +) -> np.ndarray: """ Purge part of the train set. - Given a left boundary index test_fold_start of the test set, this method removes from the train set all the - samples whose evaluation time is posterior to the prediction time of the first test sample after the boundary. + Given a left boundary index test_fold_start of the test set, + this method removes from the train set all the samples whose evaluation time + is posterior to the prediction time of the first test sample after the boundary. Parameters ---------- @@ -475,7 +289,9 @@ def purge(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, """ time_test_fold_start = cv.pred_times.iloc[test_fold_start] # The train indices before the start of the test fold, purged. - train_indices_1 = np.intersect1d(train_indices, cv.indices[cv.eval_times < time_test_fold_start]) + train_indices_1 = np.intersect1d( + train_indices, cv.indices[cv.eval_times < time_test_fold_start] + ) # The train indices after the end of the test fold. train_indices_2 = np.intersect1d(train_indices, cv.indices[test_fold_end:]) - return np.concatenate((train_indices_1, train_indices_2)) \ No newline at end of file + return np.concatenate((train_indices_1, train_indices_2)) diff --git a/timeseriescv/tests/test_cpcv.py b/timeseriescv/tests/test_cpcv.py new file mode 100644 index 0000000..446cc01 --- /dev/null +++ b/timeseriescv/tests/test_cpcv.py @@ -0,0 +1,25 @@ +import sys + +sys.path.append("../../") +from timeseriescv.cross_validation import CombPurgedKFoldCV as CPCV +import pandas as pd +import numpy as np + +periods = 7 * 24 * 60 +tidx = pd.date_range("2016-07-01", periods=periods, freq="T") +np.random.seed([3, 1415]) +data = np.random.randn(periods) +df = pd.Series(data=data, index=tidx, name="HelloTimeSeries") + + +cpcv = CPCV( + n_splits=6, + n_test_splits=2, + embargo_td=pd.Timedelta(minutes=30), + embargo_before_td=pd.Timedelta(minutes=60), +) + + +for (train_set, test_set) in cpcv.split(df): + train_X = df.iloc[train_set] + test_X = df.iloc[test_set] diff --git a/timeseriescv/walkforward.py b/timeseriescv/walkforward.py new file mode 100644 index 0000000..7edf4b4 --- /dev/null +++ b/timeseriescv/walkforward.py @@ -0,0 +1,170 @@ + +import itertools as itt +import pandas as pd +import numpy as np +from typing import Iterable, Tuple, List +from timeseriescv.core import BaseTimeSeriesCrossValidator, purge, embargo + +class PurgedWalkForwardCV(BaseTimeSeriesCrossValidator): + """ + Purged walk-forward cross-validation + + As described in Advances in financial machine learning, Marcos Lopez de Prado, 2018. + + The samples are decomposed into n_splits folds containing equal numbers of samples, without shuffling. In each cross + validation round, n_test_splits contiguous folds are used as the test set, while the train set consists in between + min_train_splits and max_train_splits immediately preceding folds. + + Each sample should be tagged with a prediction time pred_time and an evaluation time eval_time. The split is such + that the intervals [pred_times, eval_times] associated to samples in the train and test set do not overlap. (The + overlapping samples are dropped.) + + With split_by_times = True in the split method, it is also possible to split the samples in folds spanning equal + time intervals (using the prediction time as a time tag), instead of folds containing equal numbers of samples. + + Parameters + ---------- + n_splits : int, default=10 + Number of folds. Must be at least 2. + + n_test_splits : int, default = 1 + Number of folds used in the test set. Must be at least 1. + + min_train_splits: int, default = 2 + Minimal number of folds to be used in the train set. + + max_train_splits: int, default = None + Maximal number of folds to be used in the train set. If None, there is no upper limit. + + """ + def __init__(self, n_splits=10, n_test_splits=1, min_train_splits=2, purge_count=3): + super().__init__(n_splits) + self.n_test_splits = n_test_splits + self.min_train_splits = min_train_splits + self.max_train_splits = self.n_splits - self.n_test_splits + self.purge_count = purge_count + self.fold_bounds = [] + + + def split(self, X: pd.DataFrame, split_by_time: bool = False) -> Iterable[Tuple[np.ndarray, np.ndarray]]: + """ + Yield the indices of the train and test sets. + + Although the samples are passed in the form of a pandas dataframe, the indices returned are position indices, + not labels. + + Parameters + ---------- + X : pd.DataFrame, shape (n_samples, n_features), required + Samples. Only used to extract n_samples. + + split_by_time: bool + If False, the folds contain an (approximately) equal number of samples. If True, the folds span identical + time intervals. + + Returns + ------- + train_indices: np.ndarray + A numpy array containing all the indices in the train set. + + test_indices : np.ndarray + A numpy array containing all the indices in the test set. + + """ + super().split(X) + + # Fold boundaries + self.fold_bounds = compute_fold_bounds(self, split_by_time) + + count_folds = 0 + for fold_bound in self.fold_bounds: + if count_folds < self.min_train_splits: + count_folds = count_folds + 1 + continue + if self.n_splits - count_folds < self.n_test_splits: + break + # Computes the bounds of the test set, and the corresponding indices + test_indices = self.compute_test_set(fold_bound, count_folds) + test_length = len(test_indices) + # Computes the train set indices + train_indices = self.compute_train_set(fold_bound, count_folds) + train_length = len(train_indices) + train_indices = train_indices[train_length-test_length:] + test_indices = test_indices[self.purge_count:] + + count_folds = count_folds + 1 + yield train_indices, test_indices + + + def compute_train_set(self, fold_bound: int, count_folds: int) -> np.ndarray: + """ + Compute the position indices of samples in the train set. + + Parameters + ---------- + fold_bound : int + Bound between the train set and the test set. + + count_folds : int + The number (starting at 0) of the first fold in the test set. + + Returns + ------- + train_indices: np.ndarray + A numpy array containing all the indices in the train set. + + """ + if count_folds > self.max_train_splits: + start_train = self.fold_bounds[count_folds - self.max_train_splits] + else: + start_train = 0 + train_indices = np.arange(start_train, fold_bound) + # Purge + train_indices = purge(self, train_indices, fold_bound, self.indices[-1]) + return train_indices + + + def compute_test_set(self, fold_bound: int, count_folds: int) -> np.ndarray: + """ + Compute the indices of the samples in the test set. + + Parameters + ---------- + fold_bound : int + Bound between the train set and the test set. + + count_folds : int + The number (starting at 0) of the first fold in the test set. + + Returns + ------- + test_indices: np.ndarray + A numpy array containing the test indices. + + """ + if self.n_splits - count_folds > self.n_test_splits: + end_test = self.fold_bounds[count_folds + self.n_test_splits] + else: + end_test = self.indices[-1] + 1 + return np.arange(fold_bound, end_test) + +def compute_fold_bounds(cv: BaseTimeSeriesCrossValidator, split_by_time: bool) -> List[int]: + """ + Compute a list containing the fold (left) boundaries. + + Parameters + ---------- + cv: BaseTimeSeriesCrossValidator + Cross-validation object for which the bounds need to be computed. + split_by_time: bool + If False, the folds contain an (approximately) equal number of samples. If True, the folds span identical + time intervals. + """ + if split_by_time: + full_time_span = cv.pred_times.max() - cv.pred_times.min() + fold_time_span = full_time_span / cv.n_splits + fold_bounds_times = [cv.pred_times.iloc[0] + fold_time_span * n for n in range(cv.n_splits)] + return cv.pred_times.searchsorted(fold_bounds_times) + else: + return [fold[0] for fold in np.array_split(cv.indices, cv.n_splits)] +