Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ads/opctl/operator/lowcode/forecast/model/arima.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def _build_model(self) -> pd.DataFrame:
horizon=self.spec.horizon,
target_column=self.original_target_column,
dt_column=self.spec.datetime_column.name,
postprocessing=self.spec.postprocessing,
)

Parallel(n_jobs=-1, require="sharedmem")(
Expand Down
1 change: 1 addition & 0 deletions ads/opctl/operator/lowcode/forecast/model/automlx.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def _build_model(self) -> pd.DataFrame:
horizon=self.spec.horizon,
target_column=self.original_target_column,
dt_column=self.spec.datetime_column.name,
postprocessing=self.spec.postprocessing,
)

# Clean up kwargs for pass through
Expand Down
1 change: 1 addition & 0 deletions ads/opctl/operator/lowcode/forecast/model/autots.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def _build_model(self) -> pd.DataFrame:
horizon=self.spec.horizon,
target_column=self.original_target_column,
dt_column=self.spec.datetime_column.name,
postprocessing=self.spec.postprocessing,
)
try:
model = self.loaded_models if self.loaded_models is not None else None
Expand Down
62 changes: 47 additions & 15 deletions ads/opctl/operator/lowcode/forecast/model/forecast_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from typing import Dict, List

import numpy as np
import pandas as pd

from ads.opctl import logger
Expand All @@ -18,13 +19,15 @@
get_frequency_of_datetime,
)

from ..const import ForecastOutputColumns, SupportedModels, TROUBLESHOOTING_GUIDE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to standardize on 1 linter. It's hard to read these PRs with all this junk

from ..operator_config import ForecastOperatorConfig
from ..const import TROUBLESHOOTING_GUIDE, ForecastOutputColumns, SupportedModels
from ..operator_config import ForecastOperatorConfig, PostprocessingSteps


class HistoricalData(AbstractData):
def __init__(self, spec, historical_data=None, subset=None):
super().__init__(spec=spec, name="historical_data", data=historical_data, subset=subset)
super().__init__(
spec=spec, name="historical_data", data=historical_data, subset=subset
)
self.subset = subset

def _ingest_data(self, spec):
Expand All @@ -49,15 +52,19 @@ def _verify_dt_col(self, spec):
f"{SupportedModels.AutoMLX} requires data with a frequency of at least one hour. Please try using a different model,"
" or select the 'auto' option."
)
raise InvalidParameterError(f"{message}"
f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps.")
raise InvalidParameterError(
f"{message}"
f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps."
)


class AdditionalData(AbstractData):
def __init__(self, spec, historical_data, additional_data=None, subset=None):
self.subset = subset
if additional_data is not None:
super().__init__(spec=spec, name="additional_data", data=additional_data, subset=subset)
super().__init__(
spec=spec, name="additional_data", data=additional_data, subset=subset
)
self.additional_regressors = list(self.data.columns)
elif spec.additional_data is not None:
super().__init__(spec=spec, name="additional_data", subset=subset)
Expand All @@ -70,7 +77,7 @@ def __init__(self, spec, historical_data, additional_data=None, subset=None):
)
elif historical_data.get_max_time() != add_dates[-(spec.horizon + 1)]:
raise DataMismatchError(
f"The Additional Data must be present for all historical data and the entire horizon. The Historical Data ends on {historical_data.get_max_time()}. The additonal data horizon starts after {add_dates[-(spec.horizon+1)]}. These should be the same date."
f"The Additional Data must be present for all historical data and the entire horizon. The Historical Data ends on {historical_data.get_max_time()}. The additonal data horizon starts after {add_dates[-(spec.horizon + 1)]}. These should be the same date."
f"\nPlease refer to the troubleshooting guide at {TROUBLESHOOTING_GUIDE} for resolution steps."
)
else:
Expand Down Expand Up @@ -150,7 +157,9 @@ def __init__(
self._datetime_column_name = config.spec.datetime_column.name
self._target_col = config.spec.target_column
if historical_data is not None:
self.historical_data = HistoricalData(config.spec, historical_data, subset=subset)
self.historical_data = HistoricalData(
config.spec, historical_data, subset=subset
)
self.additional_data = AdditionalData(
config.spec, self.historical_data, additional_data, subset=subset
)
Expand Down Expand Up @@ -276,6 +285,7 @@ def __init__(
horizon: int,
target_column: str,
dt_column: str,
postprocessing: PostprocessingSteps,
):
"""Forecast Output contains all the details required to generate the forecast.csv output file.

Expand All @@ -285,12 +295,14 @@ def __init__(
horizon: int length of horizon
target_column: str the name of the original target column
dt_column: the name of the original datetime column
postprocessing: postprocessing steps to be executed
"""
self.series_id_map = {}
self._set_ci_column_names(confidence_interval_width)
self.horizon = horizon
self.target_column_name = target_column
self.dt_column_name = dt_column
self.postprocessing = postprocessing

def add_series_id(
self,
Expand Down Expand Up @@ -337,6 +349,12 @@ def populate_series_output(
--------
None
"""
min_threshold, max_threshold = (
self.postprocessing.set_min_forecast,
self.postprocessing.set_max_forecast,
)
if min_threshold is not None or max_threshold is not None:
np.clip(forecast_val, min_threshold, max_threshold, out=forecast_val)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use "out=forecast_val" instead of "forecast_val=..."?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's break this out into a separate method so it's easier to find in the future. But i love the simplicity of using clip here.

try:
output_i = self.series_id_map[series_id]
except KeyError as e:
Expand Down Expand Up @@ -422,9 +440,9 @@ def _set_ci_column_names(self, confidence_interval_width):

def _check_forecast_format(self, forecast):
assert isinstance(forecast, pd.DataFrame)
assert (
len(forecast.columns) == 7
), f"Expected just 7 columns, but got: {forecast.columns}"
assert len(forecast.columns) == 7, (
f"Expected just 7 columns, but got: {forecast.columns}"
)
assert ForecastOutputColumns.DATE in forecast.columns
assert ForecastOutputColumns.SERIES in forecast.columns
assert ForecastOutputColumns.INPUT_VALUE in forecast.columns
Expand Down Expand Up @@ -506,16 +524,30 @@ def set_errors_dict(self, errors_dict: Dict):
def get_errors_dict(self):
return getattr(self, "errors_dict", None)

def merge(self, other: 'ForecastResults'):
def merge(self, other: "ForecastResults"):
"""Merge another ForecastResults object into this one."""
# Merge DataFrames if they exist, else just set
for attr in [
'forecast', 'metrics', 'test_metrics', 'local_explanations', 'global_explanations', 'model_parameters', 'models', 'errors_dict']:
"forecast",
"metrics",
"test_metrics",
"local_explanations",
"global_explanations",
"model_parameters",
"models",
"errors_dict",
]:
val_self = getattr(self, attr, None)
val_other = getattr(other, attr, None)
if val_self is not None and val_other is not None:
if isinstance(val_self, pd.DataFrame) and isinstance(val_other, pd.DataFrame):
setattr(self, attr, pd.concat([val_self, val_other], ignore_index=True, axis=0))
if isinstance(val_self, pd.DataFrame) and isinstance(
val_other, pd.DataFrame
):
setattr(
self,
attr,
pd.concat([val_self, val_other], ignore_index=True, axis=0),
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assuming this is just linter

elif isinstance(val_self, dict) and isinstance(val_other, dict):
val_self.update(val_other)
setattr(self, attr, val_self)
Expand Down
1 change: 1 addition & 0 deletions ads/opctl/operator/lowcode/forecast/model/ml_forecast.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ def _build_model(self) -> pd.DataFrame:
horizon=self.spec.horizon,
target_column=self.original_target_column,
dt_column=self.date_col,
postprocessing=self.spec.postprocessing,
)
self._train_model(data_train, data_test, model_kwargs)
return self.forecast_output.get_forecast_long()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def _build_model(self) -> pd.DataFrame:
horizon=self.spec.horizon,
target_column=self.original_target_column,
dt_column=self.spec.datetime_column.name,
postprocessing=self.spec.postprocessing,
)

for i, (s_id, df) in enumerate(full_data_dict.items()):
Expand Down
1 change: 1 addition & 0 deletions ads/opctl/operator/lowcode/forecast/model/prophet.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ def _build_model(self) -> pd.DataFrame:
horizon=self.spec.horizon,
target_column=self.original_target_column,
dt_column=self.spec.datetime_column.name,
postprocessing=self.spec.postprocessing,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pass this everywhere if it's already part of "self"?

)

Parallel(n_jobs=-1, require="sharedmem")(
Expand Down
2 changes: 2 additions & 0 deletions ads/opctl/operator/lowcode/forecast/model_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ def create_operator_config(
backtest_spec["output_directory"] = {"url": output_file_path}
backtest_spec["target_category_columns"] = [DataColumns.Series]
backtest_spec["generate_explanations"] = False
backtest_spec.pop('postprocessing', None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why we'd want to pop this.
Don't we want to evaluate models based on which is giving the best end result? The post-processing is part of the end result.

I could imagine a case where the true values look like a sin function with a floor of 0. Backtesting may evaluate 2 options: a sin function and a const = sqrt(2).

Backtesting may favour the const function, even though the sin + post-processing would have a 0 MAPE.

I think we should keep postprocessing, but lets chat if you think otherwise.

cleaned_config = self.remove_none_values(backtest_op_config_draft)

backtest_op_config = ForecastOperatorConfig.from_dict(obj_dict=cleaned_config)
Expand Down Expand Up @@ -233,6 +234,7 @@ def find_best_model(
nonempty_metrics = {
model: metric for model, metric in metrics.items() if metric != {}
}

avg_backtests_metric = {
model: sum(value.values()) / len(value.values())
for model, value in nonempty_metrics.items()
Expand Down
14 changes: 14 additions & 0 deletions ads/opctl/operator/lowcode/forecast/operator_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,14 @@ class PreprocessingSteps(DataClassSerializable):
outlier_treatment: bool = True


@dataclass(repr=True)
class PostprocessingSteps(DataClassSerializable):
"""Class representing postprocessing steps for operator."""

set_min_forecast: int = None
set_max_forecast: int = None


@dataclass(repr=True)
class DataPreprocessor(DataClassSerializable):
"""Class representing operator specification preprocessing details."""
Expand Down Expand Up @@ -110,6 +118,7 @@ class ForecastOperatorSpec(DataClassSerializable):
local_explanation_filename: str = None
target_column: str = None
preprocessing: DataPreprocessor = field(default_factory=DataPreprocessor)
postprocessing: PostprocessingSteps = field(default_factory=PostprocessingSteps)
datetime_column: DateTimeColumn = field(default_factory=DateTimeColumn)
target_category_columns: List[str] = field(default_factory=list)
generate_report: bool = None
Expand Down Expand Up @@ -146,6 +155,11 @@ def __post_init__(self):
if self.preprocessing is not None
else DataPreprocessor(enabled=True)
)
self.postprocessing = (
self.postprocessing
if self.postprocessing is not None
else PostprocessingSteps()
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is more lines than the simpler

if self.postprocessing is None:
self.postprocessing = PostprocessingSteps()

# For Report Generation. When user doesn't specify defaults to True
self.generate_report = (
self.generate_report if self.generate_report is not None else True
Expand Down
15 changes: 15 additions & 0 deletions ads/opctl/operator/lowcode/forecast/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,21 @@ spec:
required: false
default: false

postprocessing:
type: dict
required: false
schema:
set_min_forecast:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about just "min", "max"?
We don't use "set" anywhere ele and "forecast" is redundant

type: integer
required: false
meta:
description: "This can be used to define the minimum forecast in the output."
set_max_forecast:
type: integer
required: false
meta:
description: "This can be used to define the maximum forecast in the output."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about
"Set a minimum value for the forecast" and "Set a maximum value for the forecast"


generate_explanations:
type: boolean
required: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Below is an example of a ``forecast.yaml`` file with every parameter specified:
- string
- No
- prophet
- Model to use. Options: prophet, arima, neuralprophet, automlx, autots, auto-select.
- Model to use. Options: prophet, arima, neuralprophet, automlx, autots, auto-select, auto-select-series.

* - model_kwargs
- dict
Expand All @@ -163,6 +163,18 @@ Below is an example of a ``forecast.yaml`` file with every parameter specified:
- false
- Handle outliers.

* - postprocessing.set_min_forecast
- integer
- No
-
- This can be used to define the minimum forecast in the output.

* - postprocessing.set_max_forecast
- integer
- No
-
- This can be used to define the maximum forecast in the output.

* - generate_explanations
- boolean
- No
Expand Down Expand Up @@ -266,7 +278,7 @@ Further Description
* **format**: (Optional) Specify the format for output data (e.g., ``csv``, ``json``, ``excel``).
* **options**: (Optional) Include any additional arguments, such as connection parameters for storage.

* **model**: (Optional) The name of the model framework to use. Defaults to ``auto-select``. Available options include ``arima``, ``prophet``, ``neuralprophet``, ``autots``, and ``auto-select``.
* **model**: (Optional) The name of the model framework to use. Defaults to ``prophet``. Available options include ``arima``, ``prophet``, ``neuralprophet``, ``automlx``, ``autots``, ``auto-select``, and ``auto-select-series``.

* **model_kwargs**: (Optional) A dictionary of arguments to pass directly to the model framework, allowing for detailed control over modeling.

Expand All @@ -282,6 +294,10 @@ Further Description
* **preprocessing**: (Optional) Controls preprocessing and feature engineering steps. This can be enabled or disabled using the ``enabled`` flag. The default is ``true``.
* **steps**: (Optional) Specific preprocessing steps, such as ``missing_value_imputation`` and ``outlier_treatment``, which are enabled by default.

* **postprocessing**: (Optional) Controls postprocessing steps.
* **set_min_forecast**: (Optional) This can be used to define the minimum forecast in the output.
* **set_max_forecast**: (Optional) This can be used to define the maximum forecast in the output.

* **metric**: (Optional) The metric to select during model evaluation. Options include ``MAPE``, ``RMSE``, ``MSE``, and ``SMAPE``. The default is ``MAPE``.

* **confidence_interval_width**: (Optional) The width of the confidence interval to calculate in the forecast. The default is 0.80, indicating an 80% confidence interval.
Expand Down Expand Up @@ -324,4 +340,4 @@ Further Description
* **cool_down_in_seconds**: The cooldown period (in seconds) to wait before performing another scaling action.
* **scaling_metric**: The metric used for scaling actions. e.g. ``CPU_UTILIZATION`` or ``MEMORY_UTILIZATION``
* **scale_in_threshold**: The utilization percentage below which the instances will scale in (reduce).
* **scale_out_threshold**: The utilization percentage above which the instances will scale out (increase).
* **scale_out_threshold**: The utilization percentage above which the instances will scale out (increase).
37 changes: 37 additions & 0 deletions tests/operators/forecast/test_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,5 +413,42 @@ def run_operator(
# generate_train_metrics = True


def test_postprocessing_clipping():
"""Tests the postprocessing clipping of forecast values."""
df = pd.DataFrame(
{
"Date": pd.to_datetime(pd.date_range("2023-01-01", periods=20, freq="D")),
"Y": range(0, 40, 2),
}
)

min_clip = 40

max_clip = 42

with tempfile.TemporaryDirectory() as tmpdirname:
output_data_path = f"{tmpdirname}/results"
yaml_i = deepcopy(TEMPLATE_YAML)
yaml_i["spec"]["model"] = "prophet"
yaml_i["spec"]["historical_data"].pop("url")
yaml_i["spec"]["historical_data"]["data"] = df
yaml_i["spec"]["target_column"] = "Y"
yaml_i["spec"]["datetime_column"]["name"] = DATETIME_COL
yaml_i["spec"]["horizon"] = 5
yaml_i["spec"]["output_directory"]["url"] = output_data_path
yaml_i["spec"]["postprocessing"] = {
"set_min_forecast": min_clip,
"set_max_forecast": max_clip,
}

operator_config = ForecastOperatorConfig.from_dict(yaml_i)
forecast_operate(operator_config)

forecast_df = pd.read_csv(f"{output_data_path}/forecast.csv")

assert forecast_df["forecast_value"].min() >= min_clip
assert forecast_df["forecast_value"].max() <= max_clip


if __name__ == "__main__":
pass