From 9d24f835651297e3bd5c0f1a88b57a90a2ebea92 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=81=E8=8F=AF=E5=BC=B7?= Date: Tue, 17 Mar 2020 14:24:06 +0900 Subject: [PATCH 01/10] update cpcv --- setup.py | 8 +- timeseriescv/cross_validation.py | 412 +++++++++++-------------------- 2 files changed, 143 insertions(+), 277 deletions(-) diff --git a/setup.py b/setup.py index d87c63c..3130203 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ def readme(): with open('README.rst') as f: return f.read() -setup(name='timeseriescv', +setup(name='timeseriescv_', version='0.2', description='Scikit-learn style cross-validation classes for time series data', long_description=readme(), @@ -15,9 +15,9 @@ def readme(): 'Topic :: Scientific/Engineering', ], keywords='machine-learning cross-validation scikit-learn time-series', - url='https://github.com/sam31415/timeseriescv', - author='Samuel Monnier', - author_email='samuel.monnier@gmail.com', + url='https://github.com/pythagorea1/timeseriescv', + author='Kakyo Okina', + author_email='kakyo@alpacadb.com', license='MIT', packages=['timeseriescv'], install_requires=[ diff --git a/timeseriescv/cross_validation.py b/timeseriescv/cross_validation.py index 3131777..15d515f 100644 --- a/timeseriescv/cross_validation.py +++ b/timeseriescv/cross_validation.py @@ -1,20 +1,25 @@ import itertools as itt -import numbers import numpy as np import pandas as pd from abc import abstractmethod from typing import Iterable, Tuple, List +# fork from https://github.com/sam31415/timeseriescv + +D, N, U = -1, 0, 1 + class BaseTimeSeriesCrossValidator: """ Abstract class for time series cross-validation. - - Time series cross-validation requires each sample has a prediction time pred_time, at which the features are used to - predict the response, and an evaluation time eval_time, at which the response is known and the error can be - computed. Importantly, it means that unlike in standard sklearn cross-validation, the samples X, response y, - pred_times and eval_times must all be pandas dataframe/series having the same index. It is also assumed that the + Time series cross-validation requires each sample has a prediction time pred_time, + at which the features are used to predict the response, + and an evaluation time eval_time, at which the response is known and the error can be computed. + Importantly, it means that unlike in standard sklearn cross-validation, + the samples X, response y, + pred_times and eval_times must all be pandas dataframe/series having the same index. + It is also assumed that the samples are time-ordered with respect to the prediction time (i.e. pred_times is non-decreasing). Parameters @@ -23,213 +28,19 @@ class BaseTimeSeriesCrossValidator: Number of folds. Must be at least 2. """ + def __init__(self, n_splits=10): - if not isinstance(n_splits, numbers.Integral): - raise ValueError(f"The number of folds must be of Integral type. {n_splits} of type {type(n_splits)}" - f" was passed.") n_splits = int(n_splits) - if n_splits <= 1: - raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting n_splits = 2 " - f"or more, got n_splits = {n_splits}.") self.n_splits = n_splits self.pred_times = None self.eval_times = None self.indices = None @abstractmethod - def split(self, X: pd.DataFrame, y: pd.Series = None, - pred_times: pd.Series = None, eval_times: pd.Series = None): - if not isinstance(X, pd.DataFrame) and not isinstance(X, pd.Series): - raise ValueError('X should be a pandas DataFrame/Series.') - if not isinstance(y, pd.Series) and y is not None: - raise ValueError('y should be a pandas Series.') - if not isinstance(pred_times, pd.Series): - raise ValueError('pred_times should be a pandas Series.') - if not isinstance(eval_times, pd.Series): - raise ValueError('eval_times should be a pandas Series.') - if y is not None and (X.index == y.index).sum() != len(y): - raise ValueError('X and y must have the same index') - if (X.index == pred_times.index).sum() != len(pred_times): - raise ValueError('X and pred_times must have the same index') - if (X.index == eval_times.index).sum() != len(eval_times): - raise ValueError('X and eval_times must have the same index') - - self.pred_times = pred_times - self.eval_times = eval_times + def split(self, X: pd.DataFrame): self.indices = np.arange(X.shape[0]) - - -class PurgedWalkForwardCV(BaseTimeSeriesCrossValidator): - """ - Purged walk-forward cross-validation - - As described in Advances in financial machine learning, Marcos Lopez de Prado, 2018. - - The samples are decomposed into n_splits folds containing equal numbers of samples, without shuffling. In each cross - validation round, n_test_splits contiguous folds are used as the test set, while the train set consists in between - min_train_splits and max_train_splits immediately preceding folds. - - Each sample should be tagged with a prediction time pred_time and an evaluation time eval_time. The split is such - that the intervals [pred_times, eval_times] associated to samples in the train and test set do not overlap. (The - overlapping samples are dropped.) - - With split_by_times = True in the split method, it is also possible to split the samples in folds spanning equal - time intervals (using the prediction time as a time tag), instead of folds containing equal numbers of samples. - - Parameters - ---------- - n_splits : int, default=10 - Number of folds. Must be at least 2. - - n_test_splits : int, default = 1 - Number of folds used in the test set. Must be at least 1. - - min_train_splits: int, default = 2 - Minimal number of folds to be used in the train set. - - max_train_splits: int, default = None - Maximal number of folds to be used in the train set. If None, there is no upper limit. - - """ - def __init__(self, n_splits=10, n_test_splits=1, min_train_splits=2, max_train_splits=None): - super().__init__(n_splits) - if not isinstance(n_test_splits, numbers.Integral): - raise ValueError(f"The number of test folds must be of Integral type. {n_test_splits} of type " - f"{type(n_test_splits)} was passed.") - n_test_splits = int(n_test_splits) - if n_test_splits <= 0 or n_test_splits >= self.n_splits - 1: - raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting " - f"n_test_splits between 1 and n_splits - 1, got n_test_splits = {n_test_splits}.") - self.n_test_splits = n_test_splits - - if not isinstance(min_train_splits, numbers.Integral): - raise ValueError(f"The minimal number of train folds must be of Integral type. {min_train_splits} of type " - f"{type(min_train_splits)} was passed.") - min_train_splits = int(min_train_splits) - if min_train_splits <= 0 or min_train_splits >= self.n_splits - self.n_test_splits: - raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting " - f"min_train_splits between 1 and n_splits - n_test_splits, got min_train_splits = " - f"{min_train_splits}.") - self.min_train_splits = min_train_splits - - if max_train_splits is None: - max_train_splits = self.n_splits - self.n_test_splits - if not isinstance(max_train_splits, numbers.Integral): - raise ValueError(f"The maximal number of train folds must be of Integral type. {max_train_splits} of type " - f"{type(max_train_splits)} was passed.") - max_train_splits = int(max_train_splits) - if max_train_splits <= 0 or max_train_splits > self.n_splits - self.n_test_splits: - raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting " - f"max_train_split between 1 and n_splits - n_test_splits, got max_train_split = " - f"{max_train_splits}.") - self.max_train_splits = max_train_splits - self.fold_bounds = [] - - def split(self, X: pd.DataFrame, y: pd.Series = None, pred_times: pd.Series = None, eval_times: pd.Series = None, - split_by_time: bool = False) -> Iterable[Tuple[np.ndarray, np.ndarray]]: - """ - Yield the indices of the train and test sets. - - Although the samples are passed in the form of a pandas dataframe, the indices returned are position indices, - not labels. - - Parameters - ---------- - X : pd.DataFrame, shape (n_samples, n_features), required - Samples. Only used to extract n_samples. - - y : pd.Series, not used, inherited from _BaseKFold - - pred_times : pd.Series, shape (n_samples,), required - Times at which predictions are made. pred_times.index has to coincide with X.index. - - eval_times : pd.Series, shape (n_samples,), required - Times at which the response becomes available and the error can be computed. eval_times.index has to - coincide with X.index. - - split_by_time: bool - If False, the folds contain an (approximately) equal number of samples. If True, the folds span identical - time intervals. - - Returns - ------- - train_indices: np.ndarray - A numpy array containing all the indices in the train set. - - test_indices : np.ndarray - A numpy array containing all the indices in the test set. - - """ - super().split(X, y, pred_times, eval_times) - - # Fold boundaries - self.fold_bounds = compute_fold_bounds(self, split_by_time) - - count_folds = 0 - for fold_bound in self.fold_bounds: - if count_folds < self.min_train_splits: - count_folds = count_folds + 1 - continue - if self.n_splits - count_folds < self.n_test_splits: - break - # Computes the bounds of the test set, and the corresponding indices - test_indices = self.compute_test_set(fold_bound, count_folds) - # Computes the train set indices - train_indices = self.compute_train_set(fold_bound, count_folds) - - count_folds = count_folds + 1 - yield train_indices, test_indices - - def compute_train_set(self, fold_bound: int, count_folds: int) -> np.ndarray: - """ - Compute the position indices of samples in the train set. - - Parameters - ---------- - fold_bound : int - Bound between the train set and the test set. - - count_folds : int - The number (starting at 0) of the first fold in the test set. - - Returns - ------- - train_indices: np.ndarray - A numpy array containing all the indices in the train set. - - """ - if count_folds > self.max_train_splits: - start_train = self.fold_bounds[count_folds - self.max_train_splits] - else: - start_train = 0 - train_indices = np.arange(start_train, fold_bound) - # Purge - train_indices = purge(self, train_indices, fold_bound, self.indices[-1]) - return train_indices - - def compute_test_set(self, fold_bound: int, count_folds: int) -> np.ndarray: - """ - Compute the indices of the samples in the test set. - - Parameters - ---------- - fold_bound : int - Bound between the train set and the test set. - - count_folds : int - The number (starting at 0) of the first fold in the test set. - - Returns - ------- - test_indices: np.ndarray - A numpy array containing the test indices. - - """ - if self.n_splits - count_folds > self.n_test_splits: - end_test = self.fold_bounds[count_folds + self.n_test_splits] - else: - end_test = self.indices[-1] + 1 - return np.arange(fold_bound, end_test) + self.eval_times = pd.Series(X.index) + self.pred_times = pd.Series(X.index) class CombPurgedKFoldCV(BaseTimeSeriesCrossValidator): @@ -238,13 +49,15 @@ class CombPurgedKFoldCV(BaseTimeSeriesCrossValidator): As described in Advances in financial machine learning, Marcos Lopez de Prado, 2018. - The samples are decomposed into n_splits folds containing equal numbers of samples, without shuffling. In each cross - validation round, n_test_splits folds are used as the test set, while the other folds are used as the train set. + The samples are decomposed into n_splits folds containing equal numbers of samples, without shuffling. + In each cross validation round, n_test_splits folds are used as the test set, + while the other folds are used as the train set. There are as many rounds as n_test_splits folds among the n_splits folds. - Each sample should be tagged with a prediction time pred_time and an evaluation time eval_time. The split is such - that the intervals [pred_times, eval_times] associated to samples in the train and test set do not overlap. (The - overlapping samples are dropped.) In addition, an "embargo" period is defined, giving the minimal time between an + Each sample should be tagged with a prediction time pred_time and an evaluation time eval_time. + The split is such that the intervals [pred_times, eval_times] associated to samples + in the train and test set do not overlap. + (The overlapping samples are dropped.) In addition, an "embargo" period is defined, giving the minimal time between an evaluation time in the test set and a prediction time in the training set. This is to avoid, in the presence of temporal correlation, a contamination of the test set by the train set. @@ -260,30 +73,21 @@ class CombPurgedKFoldCV(BaseTimeSeriesCrossValidator): Embargo period (see explanations above). """ - def __init__(self, n_splits=10, n_test_splits=2, embargo_td=pd.Timedelta(minutes=0)): + + def __init__( + self, n_splits=10, n_test_splits=2, embargo_td=pd.Timedelta(minutes=0) + ): super().__init__(n_splits) - if not isinstance(n_test_splits, numbers.Integral): - raise ValueError(f"The number of test folds must be of Integral type. {n_test_splits} of type " - f"{type(n_test_splits)} was passed.") n_test_splits = int(n_test_splits) - if n_test_splits <= 0 or n_test_splits > self.n_splits - 1: - raise ValueError(f"K-fold cross-validation requires at least one train/test split by setting " - f"n_test_splits between 1 and n_splits - 1, got n_test_splits = {n_test_splits}.") self.n_test_splits = n_test_splits - if not isinstance(embargo_td, pd.Timedelta): - raise ValueError(f"The embargo time should be of type Pandas Timedelta. {embargo_td} of type " - f"{type(embargo_td)} was passed.") - if embargo_td < pd.Timedelta(minutes=0): - raise ValueError(f"The embargo time should be positive, got embargo = {embargo_td}.") self.embargo_td = embargo_td - def split(self, X: pd.DataFrame, y: pd.Series = None, - pred_times: pd.Series = None, eval_times: pd.Series = None) -> Iterable[Tuple[np.ndarray, np.ndarray]]: + def split(self, X: pd.DataFrame) -> Iterable[Tuple[np.ndarray, np.ndarray]]: """ Yield the indices of the train and test sets. - Although the samples are passed in the form of a pandas dataframe, the indices returned are position indices, - not labels. + Although the samples are passed in the form of a pandas dataframe, + the indices returned are position indices, not labels. Parameters ---------- @@ -292,12 +96,6 @@ def split(self, X: pd.DataFrame, y: pd.Series = None, y : pd.Series, not used, inherited from _BaseKFold - pred_times : pd.Series, shape (n_samples,), required - Times at which predictions are made. pred_times.index has to coincide with X.index. - - eval_times : pd.Series, shape (n_samples,), required - Times at which the response becomes available and the error can be computed. eval_times.index has to - coincide with X.index. Returns ------- @@ -308,10 +106,13 @@ def split(self, X: pd.DataFrame, y: pd.Series = None, A numpy array containing all the indices in the test set. """ - super().split(X, y, pred_times, eval_times) + super().split(X) # Fold boundaries - fold_bounds = [(fold[0], fold[-1] + 1) for fold in np.array_split(self.indices, self.n_splits)] + fold_bounds = [ + (fold[0], fold[-1] + 1) + for fold in np.array_split(self.indices, self.n_splits) + ] # List of all combinations of n_test_splits folds selected to become test sets selected_fold_bounds = list(itt.combinations(fold_bounds, self.n_test_splits)) # In order for the first round to have its whole test set at the end of the dataset @@ -325,7 +126,9 @@ def split(self, X: pd.DataFrame, y: pd.Series = None, yield train_indices, test_indices - def compute_train_set(self, test_fold_bounds: List[Tuple[int, int]], test_indices: np.ndarray) -> np.ndarray: + def compute_train_set( + self, test_fold_bounds: List[Tuple[int, int]], test_indices: np.ndarray + ) -> np.ndarray: """ Compute the position indices of samples in the train set. @@ -353,7 +156,9 @@ def compute_train_set(self, test_fold_bounds: List[Tuple[int, int]], test_indice train_indices = embargo(self, train_indices, test_indices, test_fold_end) return train_indices - def compute_test_set(self, fold_bound_list: List[Tuple[int, int]]) -> Tuple[List[Tuple[int, int]], np.ndarray]: + def compute_test_set( + self, fold_bound_list: List[Tuple[int, int]] + ) -> Tuple[List[Tuple[int, int]], np.ndarray]: """ Compute the indices of the samples in the test set. @@ -380,39 +185,25 @@ def compute_test_set(self, fold_bound_list: List[Tuple[int, int]]) -> Tuple[List # If the current test split is contiguous to the previous one, simply updates the endpoint elif fold_start == test_fold_bounds[-1][-1]: test_fold_bounds[-1] = (test_fold_bounds[-1][0], fold_end) - test_indices = np.union1d(test_indices, self.indices[fold_start:fold_end]).astype(int) + test_indices = np.union1d( + test_indices, self.indices[fold_start:fold_end] + ).astype(int) return test_fold_bounds, test_indices -def compute_fold_bounds(cv: BaseTimeSeriesCrossValidator, split_by_time: bool) -> List[int]: - """ - Compute a list containing the fold (left) boundaries. - - Parameters - ---------- - cv: BaseTimeSeriesCrossValidator - Cross-validation object for which the bounds need to be computed. - split_by_time: bool - If False, the folds contain an (approximately) equal number of samples. If True, the folds span identical - time intervals. - """ - if split_by_time: - full_time_span = cv.pred_times.max() - cv.pred_times.min() - fold_time_span = full_time_span / cv.n_splits - fold_bounds_times = [cv.pred_times.iloc[0] + fold_time_span * n for n in range(cv.n_splits)] - return cv.pred_times.searchsorted(fold_bounds_times) - else: - return [fold[0] for fold in np.array_split(cv.indices, cv.n_splits)] - - -def embargo(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, - test_indices: np.ndarray, test_fold_end: int) -> np.ndarray: +def embargo( + cv: BaseTimeSeriesCrossValidator, + train_indices: np.ndarray, + test_indices: np.ndarray, + test_fold_end: int, +) -> np.ndarray: """ Apply the embargo procedure to part of the train set. - This amounts to dropping the train set samples whose prediction time occurs within self.embargo_dt of the test - set sample evaluation times. This method applies the embargo only to the part of the training set immediately - following the end of the test set determined by test_fold_end. + This amounts to dropping the train set samples whose prediction time occurs + within self.embargo_dt of the test set sample evaluation times. This method + applies the embargo only to the part of the training set immediately following + the end of the test set determined by test_fold_end. Parameters ---------- @@ -434,24 +225,35 @@ def embargo(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, The same array, with the indices subject to embargo removed. """ - if not hasattr(cv, 'embargo_td'): - raise ValueError("The passed cross-validation object should have a member cv.embargo_td defining the embargo" - "time.") + if not hasattr(cv, "embargo_td"): + raise ValueError( + "The passed cross-validation object should have a member cv.embargo_td defining the embargo" + "time." + ) last_test_eval_time = cv.eval_times.iloc[test_indices[:test_fold_end]].max() - min_train_index = len(cv.pred_times[cv.pred_times <= last_test_eval_time + cv.embargo_td]) + min_train_index = len( + cv.pred_times[cv.pred_times <= last_test_eval_time + cv.embargo_td] + ) if min_train_index < cv.indices.shape[0]: - allowed_indices = np.concatenate((cv.indices[:test_fold_end], cv.indices[min_train_index:])) + allowed_indices = np.concatenate( + (cv.indices[:test_fold_end], cv.indices[min_train_index:]) + ) train_indices = np.intersect1d(train_indices, allowed_indices) return train_indices -def purge(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, - test_fold_start: int, test_fold_end: int) -> np.ndarray: +def purge( + cv: BaseTimeSeriesCrossValidator, + train_indices: np.ndarray, + test_fold_start: int, + test_fold_end: int, +) -> np.ndarray: """ Purge part of the train set. - Given a left boundary index test_fold_start of the test set, this method removes from the train set all the - samples whose evaluation time is posterior to the prediction time of the first test sample after the boundary. + Given a left boundary index test_fold_start of the test set, + this method removes from the train set all the samples whose evaluation time + is posterior to the prediction time of the first test sample after the boundary. Parameters ---------- @@ -475,7 +277,71 @@ def purge(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, """ time_test_fold_start = cv.pred_times.iloc[test_fold_start] # The train indices before the start of the test fold, purged. - train_indices_1 = np.intersect1d(train_indices, cv.indices[cv.eval_times < time_test_fold_start]) + train_indices_1 = np.intersect1d( + train_indices, cv.indices[cv.eval_times < time_test_fold_start] + ) # The train indices after the end of the test fold. train_indices_2 = np.intersect1d(train_indices, cv.indices[test_fold_end:]) - return np.concatenate((train_indices_1, train_indices_2)) \ No newline at end of file + return np.concatenate((train_indices_1, train_indices_2)) + + +def evaluate( + x, + y, + label, + model, + lossfunc, + n_splits=6, + n_test_splits=2, + embargo_td=pd.Timedelta(minutes=10), +): + """ + Args: + x (pd.DataFrame) : data of features + y (pd.DataFrame) : data of labels + label (pd.DataFrame) : answer labels. + model : emsemble model + n_splits (int) : the number of groups + default 6. + n_test_splits (int) : the number of test groups + default 2. + embargo_td (pd.Timedelta) : Embargo time. + Embargo is a loss between current time and observation time. + default pd.Timedelta(minutes=10). + lossfunc : loss function. + """ + + cv = CombPurgedKFoldCV( + n_splits=n_splits, n_test_splits=n_test_splits, embargo_td=embargo_td + ) + + losses = [] + + for train_set, test_set in cv.split(x): + + train_x = x.iloc[train_set] + train_y = y.iloc[train_set] + test_x = x.iloc[test_set] + test_y = y.iloc[test_set] + + model.fit(train_x.values, train_y.values.ravel()) + prob = model.transform(test_x) + + preds = test_y.copy() + preds.loc[:, "up"] = prob[:, 2] + preds.loc[:, "neutral"] = prob[:, 1] + preds.loc[:, "down"] = prob[:, 0] + + preds.loc[:, "label_pred"] = ( + np.argmax(preds[["down", "neutral", "up"]].values, axis=1) - 1 + ) + + preds.loc[:, "label_diff"] = label.label_diff + preds.loc[:, "label_res"] = label.label_res + preds.loc[:, "pl"] = label.label_diff + preds.loc[preds.label_pred == D, "pl"] *= -1 + preds.loc[preds.label_pred == N, "pl"] *= 0 + + losses.append(lossfunc(preds)) + return losses + From d8e519515b7eaf8cfff9b43ca1adc463fa934b3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=81=E8=8F=AF=E5=BC=B7?= Date: Tue, 17 Mar 2020 14:44:52 +0900 Subject: [PATCH 02/10] FIx name --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 3130203..05e6c15 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ def readme(): with open('README.rst') as f: return f.read() -setup(name='timeseriescv_', +setup(name='timeseriescv', version='0.2', description='Scikit-learn style cross-validation classes for time series data', long_description=readme(), From 5e6b543279cfea1d3a442fde65dab7726e152b6e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=81=E8=8F=AF=E5=BC=B7?= Date: Thu, 19 Mar 2020 18:03:38 +0900 Subject: [PATCH 03/10] Add walkforward framework --- timeseriescv/core.py | 116 ++++++++++++++++++++++++ timeseriescv/walkforward.py | 170 ++++++++++++++++++++++++++++++++++++ 2 files changed, 286 insertions(+) create mode 100644 timeseriescv/core.py create mode 100644 timeseriescv/walkforward.py diff --git a/timeseriescv/core.py b/timeseriescv/core.py new file mode 100644 index 0000000..1db97e4 --- /dev/null +++ b/timeseriescv/core.py @@ -0,0 +1,116 @@ +import numpy as np +import pandas as pd + +from abc import abstractmethod +from typing import Iterable, Tuple, List + +# fork from https://github.com/sam31415/timeseriescv + + +class BaseTimeSeriesCrossValidator: + """ + Abstract class for time series cross-validation. + Time series cross-validation requires each sample has a prediction time pred_time, + at which the features are used to predict the response, + and an evaluation time eval_time, at which the response is known and the error can be computed. + Importantly, it means that unlike in standard sklearn cross-validation, + the samples X, response y, + pred_times and eval_times must all be pandas dataframe/series having the same index. + It is also assumed that the + samples are time-ordered with respect to the prediction time (i.e. pred_times is non-decreasing). + + Parameters + ---------- + n_splits : int, default=10 + Number of folds. Must be at least 2. + + """ + + def __init__(self, n_splits=10): + n_splits = int(n_splits) + self.n_splits = n_splits + self.pred_times = None + self.eval_times = None + self.indices = None + + @abstractmethod + def split(self, X: pd.DataFrame): + self.indices = np.arange(X.shape[0]) + self.eval_times = pd.Series(X.index) + self.pred_times = pd.Series(X.index) + + +def embargo(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, + test_indices: np.ndarray, test_fold_end: int) -> np.ndarray: + """ + Apply the embargo procedure to part of the train set. + + This amounts to dropping the train set samples whose prediction time occurs within self.embargo_dt of the test + set sample evaluation times. This method applies the embargo only to the part of the training set immediately + following the end of the test set determined by test_fold_end. + + Parameters + ---------- + cv: Cross-validation class + Needs to have the attributes cv.pred_times, cv.eval_times, cv.embargo_dt and cv.indices. + + train_indices: np.ndarray + A numpy array containing all the indices of the samples currently included in the train set. + + test_indices : np.ndarray + A numpy array containing all the indices of the samples in the test set. + + test_fold_end : int + Index corresponding to the end of a test set block. + + Returns + ------- + train_indices: np.ndarray + The same array, with the indices subject to embargo removed. + + """ + if not hasattr(cv, 'embargo_td'): + raise ValueError("The passed cross-validation object should have a member cv.embargo_td defining the embargo" + "time.") + last_test_eval_time = cv.eval_times.iloc[test_indices[:test_fold_end]].max() + min_train_index = len(cv.pred_times[cv.pred_times <= last_test_eval_time + cv.embargo_td]) + if min_train_index < cv.indices.shape[0]: + allowed_indices = np.concatenate((cv.indices[:test_fold_end], cv.indices[min_train_index:])) + train_indices = np.intersect1d(train_indices, allowed_indices) + return train_indices + + +def purge(cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, + test_fold_start: int, test_fold_end: int) -> np.ndarray: + """ + Purge part of the train set. + + Given a left boundary index test_fold_start of the test set, this method removes from the train set all the + samples whose evaluation time is posterior to the prediction time of the first test sample after the boundary. + + Parameters + ---------- + cv: Cross-validation class + Needs to have the attributes cv.pred_times, cv.eval_times and cv.indices. + + train_indices: np.ndarray + A numpy array containing all the indices of the samples currently included in the train set. + + test_fold_start : int + Index corresponding to the start of a test set block. + + test_fold_end : int + Index corresponding to the end of the same test set block. + + Returns + ------- + train_indices: np.ndarray + A numpy array containing the train indices purged at test_fold_start. + + """ + time_test_fold_start = cv.pred_times.iloc[test_fold_start] + # The train indices before the start of the test fold, purged. + train_indices_1 = np.intersect1d(train_indices, cv.indices[cv.eval_times < time_test_fold_start]) + # The train indices after the end of the test fold. + train_indices_2 = np.intersect1d(train_indices, cv.indices[test_fold_end:]) + return np.concatenate((train_indices_1, train_indices_2)) \ No newline at end of file diff --git a/timeseriescv/walkforward.py b/timeseriescv/walkforward.py new file mode 100644 index 0000000..ed4f731 --- /dev/null +++ b/timeseriescv/walkforward.py @@ -0,0 +1,170 @@ + +import itertools as itt +import pandas as pd +import numpy as np +from typing import Iterable, Tuple, List +from core import BaseTimeSeriesCrossValidator, purge, embargo + +class PurgedWalkForwardCV(BaseTimeSeriesCrossValidator): + """ + Purged walk-forward cross-validation + + As described in Advances in financial machine learning, Marcos Lopez de Prado, 2018. + + The samples are decomposed into n_splits folds containing equal numbers of samples, without shuffling. In each cross + validation round, n_test_splits contiguous folds are used as the test set, while the train set consists in between + min_train_splits and max_train_splits immediately preceding folds. + + Each sample should be tagged with a prediction time pred_time and an evaluation time eval_time. The split is such + that the intervals [pred_times, eval_times] associated to samples in the train and test set do not overlap. (The + overlapping samples are dropped.) + + With split_by_times = True in the split method, it is also possible to split the samples in folds spanning equal + time intervals (using the prediction time as a time tag), instead of folds containing equal numbers of samples. + + Parameters + ---------- + n_splits : int, default=10 + Number of folds. Must be at least 2. + + n_test_splits : int, default = 1 + Number of folds used in the test set. Must be at least 1. + + min_train_splits: int, default = 2 + Minimal number of folds to be used in the train set. + + max_train_splits: int, default = None + Maximal number of folds to be used in the train set. If None, there is no upper limit. + + """ + def __init__(self, n_splits=10, n_test_splits=1, min_train_splits=2, purge_count=3): + super().__init__(n_splits) + self.n_test_splits = n_test_splits + self.min_train_splits = min_train_splits + self.max_train_splits = self.n_splits - self.n_test_splits + self.purge_count = purge_count + self.fold_bounds = [] + + + def split(self, X: pd.DataFrame, split_by_time: bool = False) -> Iterable[Tuple[np.ndarray, np.ndarray]]: + """ + Yield the indices of the train and test sets. + + Although the samples are passed in the form of a pandas dataframe, the indices returned are position indices, + not labels. + + Parameters + ---------- + X : pd.DataFrame, shape (n_samples, n_features), required + Samples. Only used to extract n_samples. + + split_by_time: bool + If False, the folds contain an (approximately) equal number of samples. If True, the folds span identical + time intervals. + + Returns + ------- + train_indices: np.ndarray + A numpy array containing all the indices in the train set. + + test_indices : np.ndarray + A numpy array containing all the indices in the test set. + + """ + super().split(X) + + # Fold boundaries + self.fold_bounds = compute_fold_bounds(self, split_by_time) + + count_folds = 0 + for fold_bound in self.fold_bounds: + if count_folds < self.min_train_splits: + count_folds = count_folds + 1 + continue + if self.n_splits - count_folds < self.n_test_splits: + break + # Computes the bounds of the test set, and the corresponding indices + test_indices = self.compute_test_set(fold_bound, count_folds) + test_length = len(test_indices) + # Computes the train set indices + train_indices = self.compute_train_set(fold_bound, count_folds) + train_length = len(train_indices) + train_indices = train_indices[train_length-test_length:] + test_indices = test_indices[self.purge_count:] + + count_folds = count_folds + 1 + yield train_indices, test_indices + + + def compute_train_set(self, fold_bound: int, count_folds: int) -> np.ndarray: + """ + Compute the position indices of samples in the train set. + + Parameters + ---------- + fold_bound : int + Bound between the train set and the test set. + + count_folds : int + The number (starting at 0) of the first fold in the test set. + + Returns + ------- + train_indices: np.ndarray + A numpy array containing all the indices in the train set. + + """ + if count_folds > self.max_train_splits: + start_train = self.fold_bounds[count_folds - self.max_train_splits] + else: + start_train = 0 + train_indices = np.arange(start_train, fold_bound) + # Purge + train_indices = purge(self, train_indices, fold_bound, self.indices[-1]) + return train_indices + + + def compute_test_set(self, fold_bound: int, count_folds: int) -> np.ndarray: + """ + Compute the indices of the samples in the test set. + + Parameters + ---------- + fold_bound : int + Bound between the train set and the test set. + + count_folds : int + The number (starting at 0) of the first fold in the test set. + + Returns + ------- + test_indices: np.ndarray + A numpy array containing the test indices. + + """ + if self.n_splits - count_folds > self.n_test_splits: + end_test = self.fold_bounds[count_folds + self.n_test_splits] + else: + end_test = self.indices[-1] + 1 + return np.arange(fold_bound, end_test) + +def compute_fold_bounds(cv: BaseTimeSeriesCrossValidator, split_by_time: bool) -> List[int]: + """ + Compute a list containing the fold (left) boundaries. + + Parameters + ---------- + cv: BaseTimeSeriesCrossValidator + Cross-validation object for which the bounds need to be computed. + split_by_time: bool + If False, the folds contain an (approximately) equal number of samples. If True, the folds span identical + time intervals. + """ + if split_by_time: + full_time_span = cv.pred_times.max() - cv.pred_times.min() + fold_time_span = full_time_span / cv.n_splits + fold_bounds_times = [cv.pred_times.iloc[0] + fold_time_span * n for n in range(cv.n_splits)] + return cv.pred_times.searchsorted(fold_bounds_times) + else: + return [fold[0] for fold in np.array_split(cv.indices, cv.n_splits)] + From 20dce2e1feb8477373a677f63ba1095d936fda94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=81=E8=8F=AF=E5=BC=B7?= Date: Thu, 19 Mar 2020 18:10:50 +0900 Subject: [PATCH 04/10] Add new tutorial --- README.rst | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/README.rst b/README.rst index 4a15556..620c9eb 100644 --- a/README.rst +++ b/README.rst @@ -19,7 +19,7 @@ Installation timeseriescv can be installed using pip: - >>> pip install timeseriescv + >>> pip install git+https://github.com/pythagorea1/timeseriescv Content ~~~~~~~ @@ -29,6 +29,17 @@ For now the package contains two main classes handling cross-validation: * ``PurgedWalkForwardCV``: Walk-forward cross-validation with purging. * ``CombPurgedKFoldCV``: Combinatorial cross-validation with purging and embargoing. +Quick Start +~~~~~~~~~~~~ + + >>> from timeseriescv.cross_validation import CombPurgedKFoldCV as CPCV + >>> cpcv = CPCV(n_splits=10, n_test_splits=2) + >>> for (train_set, test_set) in cpcv.split(df): + >>> train_X = df.iloc[train_set] + >>> test_X = df.iloc[test_set] + + + Remarks concerning the API ~~~~~~~~~~~~~~~~~~~~~~~~~~ From bdcfbf5c3f79b2acdef2a535e608d52ef9fa65e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=81=E8=8F=AF=E5=BC=B7?= Date: Thu, 19 Mar 2020 18:13:56 +0900 Subject: [PATCH 05/10] Add explanation for Walkforward --- README.rst | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/README.rst b/README.rst index 620c9eb..9c9d502 100644 --- a/README.rst +++ b/README.rst @@ -29,7 +29,7 @@ For now the package contains two main classes handling cross-validation: * ``PurgedWalkForwardCV``: Walk-forward cross-validation with purging. * ``CombPurgedKFoldCV``: Combinatorial cross-validation with purging and embargoing. -Quick Start +Quick Start for CPCV ~~~~~~~~~~~~ >>> from timeseriescv.cross_validation import CombPurgedKFoldCV as CPCV @@ -38,6 +38,14 @@ Quick Start >>> train_X = df.iloc[train_set] >>> test_X = df.iloc[test_set] +Quick Start for WalkForward +~~~~~~~~~~~~ + >>> from timeseriescv.walkforward import PurgedWalkForwardCV as WF + >>> wf = WF(n_splits=10, n_test_splits=2, purge_count=10) + >>> for (train_set, test_set) in wf.split(df): + >>> train_X = df.iloc[train_set] + >>> test_X = df.iloc[test_set] + Remarks concerning the API @@ -48,7 +56,6 @@ method is a generator that yields a pair of numpy arrays containing the position and validation set, respectively. The main differences with the scikit-learn API are: * The ``split`` method takes as arguments not only the predictor values ``X``, but also the prediction times ``pred_times`` and the evaluation times ``eval_times`` of each sample. -* To stay as close to the scikit-learn API as possible, this data is passed as separate parameters. But in order to ensure that they are properly aligned, ``X``, ``pred_times`` and ``eval_times`` are required to be pandas DataFrames/Series sharing the same index. Check the docstrings of the cross-validation classes for more information. From 9a6998ed29226664d8ee175ae5a3b6158dbf6840 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=81=E8=8F=AF=E5=BC=B7?= Date: Thu, 19 Mar 2020 18:27:41 +0900 Subject: [PATCH 06/10] Update phrases --- README.rst | 2 -- 1 file changed, 2 deletions(-) diff --git a/README.rst b/README.rst index 9c9d502..d5fd12c 100644 --- a/README.rst +++ b/README.rst @@ -55,7 +55,5 @@ The API is as similar to the scikit-learn API as possible. Like the scikit-learn method is a generator that yields a pair of numpy arrays containing the positional indices of the samples in the train and validation set, respectively. The main differences with the scikit-learn API are: -* The ``split`` method takes as arguments not only the predictor values ``X``, but also the prediction times ``pred_times`` and the evaluation times ``eval_times`` of each sample. - Check the docstrings of the cross-validation classes for more information. From 9097b52332df743eb2be5a5d6b066bfe38e66811 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=81=E8=8F=AF=E5=BC=B7?= Date: Mon, 23 Mar 2020 11:20:15 +0900 Subject: [PATCH 07/10] Remove evaluate() --- timeseriescv/cross_validation.py | 62 -------------------------------- 1 file changed, 62 deletions(-) diff --git a/timeseriescv/cross_validation.py b/timeseriescv/cross_validation.py index 15d515f..de4d530 100644 --- a/timeseriescv/cross_validation.py +++ b/timeseriescv/cross_validation.py @@ -283,65 +283,3 @@ def purge( # The train indices after the end of the test fold. train_indices_2 = np.intersect1d(train_indices, cv.indices[test_fold_end:]) return np.concatenate((train_indices_1, train_indices_2)) - - -def evaluate( - x, - y, - label, - model, - lossfunc, - n_splits=6, - n_test_splits=2, - embargo_td=pd.Timedelta(minutes=10), -): - """ - Args: - x (pd.DataFrame) : data of features - y (pd.DataFrame) : data of labels - label (pd.DataFrame) : answer labels. - model : emsemble model - n_splits (int) : the number of groups - default 6. - n_test_splits (int) : the number of test groups - default 2. - embargo_td (pd.Timedelta) : Embargo time. - Embargo is a loss between current time and observation time. - default pd.Timedelta(minutes=10). - lossfunc : loss function. - """ - - cv = CombPurgedKFoldCV( - n_splits=n_splits, n_test_splits=n_test_splits, embargo_td=embargo_td - ) - - losses = [] - - for train_set, test_set in cv.split(x): - - train_x = x.iloc[train_set] - train_y = y.iloc[train_set] - test_x = x.iloc[test_set] - test_y = y.iloc[test_set] - - model.fit(train_x.values, train_y.values.ravel()) - prob = model.transform(test_x) - - preds = test_y.copy() - preds.loc[:, "up"] = prob[:, 2] - preds.loc[:, "neutral"] = prob[:, 1] - preds.loc[:, "down"] = prob[:, 0] - - preds.loc[:, "label_pred"] = ( - np.argmax(preds[["down", "neutral", "up"]].values, axis=1) - 1 - ) - - preds.loc[:, "label_diff"] = label.label_diff - preds.loc[:, "label_res"] = label.label_res - preds.loc[:, "pl"] = label.label_diff - preds.loc[preds.label_pred == D, "pl"] *= -1 - preds.loc[preds.label_pred == N, "pl"] *= 0 - - losses.append(lossfunc(preds)) - return losses - From 7e6521a1cb84b8db47d246c5b32a01cc8fb8a314 Mon Sep 17 00:00:00 2001 From: "k.o" Date: Mon, 23 Mar 2020 11:24:42 +0900 Subject: [PATCH 08/10] Update cross_validation.py Remove evaluate method --- timeseriescv/cross_validation.py | 62 -------------------------------- 1 file changed, 62 deletions(-) diff --git a/timeseriescv/cross_validation.py b/timeseriescv/cross_validation.py index 15d515f..de4d530 100644 --- a/timeseriescv/cross_validation.py +++ b/timeseriescv/cross_validation.py @@ -283,65 +283,3 @@ def purge( # The train indices after the end of the test fold. train_indices_2 = np.intersect1d(train_indices, cv.indices[test_fold_end:]) return np.concatenate((train_indices_1, train_indices_2)) - - -def evaluate( - x, - y, - label, - model, - lossfunc, - n_splits=6, - n_test_splits=2, - embargo_td=pd.Timedelta(minutes=10), -): - """ - Args: - x (pd.DataFrame) : data of features - y (pd.DataFrame) : data of labels - label (pd.DataFrame) : answer labels. - model : emsemble model - n_splits (int) : the number of groups - default 6. - n_test_splits (int) : the number of test groups - default 2. - embargo_td (pd.Timedelta) : Embargo time. - Embargo is a loss between current time and observation time. - default pd.Timedelta(minutes=10). - lossfunc : loss function. - """ - - cv = CombPurgedKFoldCV( - n_splits=n_splits, n_test_splits=n_test_splits, embargo_td=embargo_td - ) - - losses = [] - - for train_set, test_set in cv.split(x): - - train_x = x.iloc[train_set] - train_y = y.iloc[train_set] - test_x = x.iloc[test_set] - test_y = y.iloc[test_set] - - model.fit(train_x.values, train_y.values.ravel()) - prob = model.transform(test_x) - - preds = test_y.copy() - preds.loc[:, "up"] = prob[:, 2] - preds.loc[:, "neutral"] = prob[:, 1] - preds.loc[:, "down"] = prob[:, 0] - - preds.loc[:, "label_pred"] = ( - np.argmax(preds[["down", "neutral", "up"]].values, axis=1) - 1 - ) - - preds.loc[:, "label_diff"] = label.label_diff - preds.loc[:, "label_res"] = label.label_res - preds.loc[:, "pl"] = label.label_diff - preds.loc[preds.label_pred == D, "pl"] *= -1 - preds.loc[preds.label_pred == N, "pl"] *= 0 - - losses.append(lossfunc(preds)) - return losses - From d4944caa05f1504c55d37e9f7bcd15d768265966 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=81=E8=8F=AF=E5=BC=B7?= Date: Tue, 24 Mar 2020 14:58:06 +0900 Subject: [PATCH 09/10] Update module --- timeseriescv/walkforward.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/timeseriescv/walkforward.py b/timeseriescv/walkforward.py index ed4f731..7edf4b4 100644 --- a/timeseriescv/walkforward.py +++ b/timeseriescv/walkforward.py @@ -3,7 +3,7 @@ import pandas as pd import numpy as np from typing import Iterable, Tuple, List -from core import BaseTimeSeriesCrossValidator, purge, embargo +from timeseriescv.core import BaseTimeSeriesCrossValidator, purge, embargo class PurgedWalkForwardCV(BaseTimeSeriesCrossValidator): """ From 00342a2c6652b25c2565a6bae2a879c3db1cd8e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BF=81=E8=8F=AF=E5=BC=B7?= Date: Mon, 7 Sep 2020 16:15:02 +0900 Subject: [PATCH 10/10] FIX CPCV to adjust embargo before and after test set --- timeseriescv/cross_validation.py | 18 +++++++++++++++--- timeseriescv/tests/test_cpcv.py | 25 +++++++++++++++++++++++++ 2 files changed, 40 insertions(+), 3 deletions(-) create mode 100644 timeseriescv/tests/test_cpcv.py diff --git a/timeseriescv/cross_validation.py b/timeseriescv/cross_validation.py index de4d530..cb6a866 100644 --- a/timeseriescv/cross_validation.py +++ b/timeseriescv/cross_validation.py @@ -75,12 +75,17 @@ class CombPurgedKFoldCV(BaseTimeSeriesCrossValidator): """ def __init__( - self, n_splits=10, n_test_splits=2, embargo_td=pd.Timedelta(minutes=0) + self, + n_splits=10, + n_test_splits=2, + embargo_td=pd.Timedelta(minutes=0), + embargo_before_td=pd.Timedelta(minutes=0), ): super().__init__(n_splits) n_test_splits = int(n_test_splits) self.n_test_splits = n_test_splits self.embargo_td = embargo_td + self.embargo_before_td = embargo_before_td def split(self, X: pd.DataFrame) -> Iterable[Tuple[np.ndarray, np.ndarray]]: """ @@ -153,7 +158,9 @@ def compute_train_set( # Purge train_indices = purge(self, train_indices, test_fold_start, test_fold_end) # Embargo - train_indices = embargo(self, train_indices, test_indices, test_fold_end) + train_indices = embargo( + self, train_indices, test_indices, test_fold_start, test_fold_end + ) return train_indices def compute_test_set( @@ -195,6 +202,7 @@ def embargo( cv: BaseTimeSeriesCrossValidator, train_indices: np.ndarray, test_indices: np.ndarray, + test_fold_start: int, test_fold_end: int, ) -> np.ndarray: """ @@ -231,12 +239,16 @@ def embargo( "time." ) last_test_eval_time = cv.eval_times.iloc[test_indices[:test_fold_end]].max() + first_test_eval_time = cv.eval_times.iloc[test_indices[test_fold_start:]].min() min_train_index = len( cv.pred_times[cv.pred_times <= last_test_eval_time + cv.embargo_td] ) + max_train_index = len( + cv.pred_times[cv.pred_times >= first_test_eval_time - cv.embargo_before_td] + ) if min_train_index < cv.indices.shape[0]: allowed_indices = np.concatenate( - (cv.indices[:test_fold_end], cv.indices[min_train_index:]) + (cv.indices[:max_train_index], cv.indices[min_train_index:]) ) train_indices = np.intersect1d(train_indices, allowed_indices) return train_indices diff --git a/timeseriescv/tests/test_cpcv.py b/timeseriescv/tests/test_cpcv.py new file mode 100644 index 0000000..446cc01 --- /dev/null +++ b/timeseriescv/tests/test_cpcv.py @@ -0,0 +1,25 @@ +import sys + +sys.path.append("../../") +from timeseriescv.cross_validation import CombPurgedKFoldCV as CPCV +import pandas as pd +import numpy as np + +periods = 7 * 24 * 60 +tidx = pd.date_range("2016-07-01", periods=periods, freq="T") +np.random.seed([3, 1415]) +data = np.random.randn(periods) +df = pd.Series(data=data, index=tidx, name="HelloTimeSeries") + + +cpcv = CPCV( + n_splits=6, + n_test_splits=2, + embargo_td=pd.Timedelta(minutes=30), + embargo_before_td=pd.Timedelta(minutes=60), +) + + +for (train_set, test_set) in cpcv.split(df): + train_X = df.iloc[train_set] + test_X = df.iloc[test_set]