Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"prophet",
"arima",
"lstm",
"rfregressor_volatility"
]

# Define the metrics to be used
Expand Down
46 changes: 46 additions & 0 deletions models/rfregressor_volatility/configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# pylint: disable=R0801
# Description: Configuration class for Random Forest Regressor Volatility model.
class RfregressorConfig:
"""
Configuration class for the Random Forest Regressor Volatility model.
Stores hyperparameters for the model and settings for data preprocessing.
"""

def __init__(self):
# Random Forest hyperparameters
self.params = {
"objective": "reg:squarederror", # Regression objective
"eval_metric": "rmse"
}

# Feature calculation parameters
self.candle_interval = '1min' # Base timeframe for data
self.volatility_window = '5min' # Window for volatility calculation
self.n_lags = 5 # Number of lags for features
self.ma_window = 10 # Window for moving averages
self.target_shift = 5 # Prediction horizon

# Data preprocessing
self.fillna_method = 'ffill' # Method for handling missing values
self.technical_indicators = [ # List of technical indicators to use
'SMA_10',
'EMA_10'
]

#Model parameters
self.n_estimators = 10
self.random_state = 42
self.n_jobs = 25

def display(self):
"""Prints out the current configuration."""
print("Random Forest Regressor Volatility Configuration:")
print(f" params: {self.params}")
print(f" candle_interval: {self.candle_interval}")

print(f" volatility_window: {self.volatility_window}")
print(f" n_lags: {self.n_lags}")
print(f" ma_window: {self.ma_window}")
print(f" target_shift: {self.target_shift}")
print(f" fillna_method: {self.fillna_method}")
print(f" technical_indicators: {self.technical_indicators}")
114 changes: 114 additions & 0 deletions models/rfregressor_volatility/model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor

from models.base_model import Model
from models.rfregressor_volatility.configs import RfregressorConfig

class RfregressorVolatilityModel(Model):
"""Random Forest Regressor model for volatility prediction."""

def __init__(self, model_name="rfregressor_volatility", config=RfregressorConfig(), debug=False):
super().__init__(model_name=model_name, debug=debug)
self.config = config
self.model = RandomForestRegressor(
n_estimators=self.config.n_estimators,
random_state=self.config.random_state,
n_jobs=self.config.n_jobs
)
self.candle_interval = config.candle_interval
self.volatility_window = config.volatility_window


def _calculate_non_overlapping_volatility(self, series: pd.Series, window_points: int) -> pd.Series:
"""Calculate volatility using non-overlapping windows"""
log_returns = np.log(series / series.shift(1))
volatility = pd.Series(index=series.index, dtype=float)

for start_idx in range(0, len(series), window_points):
end_idx = start_idx + window_points
if end_idx > len(series):
break
window_data = log_returns.iloc[start_idx:end_idx]
vol = window_data.std()
volatility.iloc[start_idx:end_idx] = vol

return volatility

def _calculate_volatility_features(self, df: pd.DataFrame):
"""Calculate volatility features with non-overlapping windows"""
df = df.copy()
df = df.fillna(method='ffill').fillna(method='bfill')

window_points = int(pd.Timedelta(self.volatility_window) / pd.Timedelta(self.candle_interval))
list_of_features = []

df['log_returns'] = np.log(df['close'] / df['close'].shift(1))
for i in range(1, 5):
log_return_lag_name = f'log_return_t-{i}'
list_of_features.append(log_return_lag_name)
df[log_return_lag_name] = df['log_returns'].shift(i)
volume_lag_name = f'volume_t-{i}'
list_of_features.append(volume_lag_name)
df[volume_lag_name] = df['volume'].shift(i)

df['SMA_10'] = df['close'].rolling(window=10).mean()
df['EMA_10'] = df['close'].ewm(span=10, adjust=False).mean()
list_of_features.extend(['SMA_10', 'EMA_10'])

df['current_volatility'] = self._calculate_non_overlapping_volatility(df['close'], window_points)
list_of_features.append('current_volatility')

df['target_volatility'] = df['current_volatility'].shift(-5)
print(f"df.tail(): {df.tail()}")
df_sampled = df.iloc[::window_points].copy()

# df_sampled = df.copy()
print(f"df_sampled.tail(): {df_sampled.tail()}")
return df_sampled, list_of_features

def train(self, data: pd.DataFrame):
"""Train the volatility prediction model."""
df_sampled, list_of_features = self._calculate_volatility_features(data)
df_sampled = df_sampled.dropna()
print(f"df_sampled.tail() after dropna: {df_sampled.tail()}")
list_of_features.append('close')
features = df_sampled[list_of_features]
target = df_sampled['target_volatility']

self.model.fit(features, target)
self.save()

def inference(self, input_data: pd.DataFrame) -> pd.DataFrame:
"""Make predictions using the trained model."""
df = input_data.copy()
df = df.fillna(method='ffill').fillna(method='bfill')

window_points = int(pd.Timedelta(self.volatility_window) / pd.Timedelta(self.candle_interval))
list_of_features = []

df['log_returns'] = np.log(df['close'] / df['close'].shift(1))
for i in range(1, 5):
log_return_lag_name = f'log_return_t-{i}'
list_of_features.append(log_return_lag_name)
df[log_return_lag_name] = df['log_returns'].shift(i)
volume_lag_name = f'volume_t-{i}'
list_of_features.append(volume_lag_name)
df[volume_lag_name] = df['volume'].shift(i)

df['SMA_10'] = df['close'].rolling(window=10).mean()
df['EMA_10'] = df['close'].ewm(span=10, adjust=False).mean()
list_of_features.extend(['SMA_10', 'EMA_10'])

df['current_volatility'] = self._calculate_non_overlapping_volatility(df['close'], window_points)
list_of_features.append('current_volatility')
list_of_features.append('close')

features = df[list_of_features].fillna(0)
predictions = self.model.predict(features)

return pd.DataFrame({"prediction": predictions}, index=features.index)

def forecast(self, steps: int) -> pd.DataFrame:
"""Forecast future volatility."""
return pd.DataFrame({"forecast": [0] * steps})