generated from databricks-industry-solutions/industry-solutions-blueprints
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_preparation.py
166 lines (128 loc) · 6.94 KB
/
data_preparation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# Databricks notebook source
# MAGIC %md
# MAGIC This notebook downloads the [M4 dataset](https://www.sciencedirect.com/science/article/pii/S0169207019301128) using an open source package: [`datasetsforecast`](https://github.com/Nixtla/datasetsforecast/tree/main/). The M4 dataset is a large and diverse collection of time series data used for benchmarking and evaluating the performance of forecasting methods. It is part of the [M-competition](https://forecasters.org/resources/time-series-data/) series, which are organized competitions aimed at comparing the accuracy and robustness of different forecasting methods.
# MAGIC
# MAGIC This notebook is run by other notebooks using `%run` command.
# COMMAND ----------
# MAGIC %md
# MAGIC ## Install package
# COMMAND ----------
# MAGIC %pip install datasetsforecast==0.0.8 --quiet
# MAGIC dbutils.library.restartPython()
# COMMAND ----------
# MAGIC %md
# MAGIC ##Set the logging level
# COMMAND ----------
import pathlib
import pandas as pd
from datasetsforecast.m4 import M4
import logging
logger = spark._jvm.org.apache.log4j
# Setting the logging level to ERROR for the "py4j.java_gateway" logger
# This reduces the verbosity of the logs by only showing error messages
logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR)
logging.getLogger("py4j.clientserver").setLevel(logging.ERROR)
# COMMAND ----------
# MAGIC %md
# MAGIC ## Create a catalog and a database
# MAGIC We create a catalog and a database (schema) to store the delta tables for our data.
# COMMAND ----------
# Creating a text widget for the catalog name input
dbutils.widgets.text("catalog", "")
# Creating a text widget for the database (schema) name input
dbutils.widgets.text("db", "")
# Creating a text widget for the number of time series to sample input
dbutils.widgets.text("n", "")
catalog = dbutils.widgets.get("catalog") # Name of the catalog we use to manage our assets
db = dbutils.widgets.get("db") # Name of the schema we use to store assets
n = int(dbutils.widgets.get("n")) # Number of time series to sample
# Ensure the catalog exists, create it if it does not
_ = spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
# Ensure the schema exists within the specified catalog, create it if it does not
_ = spark.sql(f"CREATE SCHEMA IF NOT EXISTS {catalog}.{db}")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Daily M4 data
# MAGIC Below are some custom functions to convert the downloaded M4 time series into a daily format. The parameter `n` specifies the number of time series to sample for your dataset.
# COMMAND ----------
def create_m4_daily():
# Load the M4 daily dataset
y_df, _, _ = M4.load(directory=str(pathlib.Path.home()), group="Daily")
# Create a list of unique IDs for the time series we want to sample
_ids = [f"D{i}" for i in range(1, n)]
# Filter and transform the dataset based on the unique IDs
y_df = (
y_df.groupby("unique_id")
.filter(lambda x: x.unique_id.iloc[0] in _ids)
.groupby("unique_id")
.apply(transform_group_daily)
.reset_index(drop=True)
)
return y_df
def transform_group_daily(df):
unique_id = df.unique_id.iloc[0] # Get the unique ID of the current group
if len(df) > 1020:
df = df.iloc[-1020:] # Limit the data to the last 1020 entries if longer
_start = pd.Timestamp("2020-01-01") # Start date for the transformed data
_end = _start + pd.DateOffset(days=int(df.count()[0]) - 1) # End date for the transformed data
date_idx = pd.date_range(start=_start, end=_end, freq="D", name="ds") # Generate the date range
res_df = pd.DataFrame(data=[], index=date_idx).reset_index() # Create an empty DataFrame with the date range
res_df["unique_id"] = unique_id # Add the unique ID column
res_df["y"] = df.y.values # Add the target variable column
return res_df
(
spark.createDataFrame(create_m4_daily()) # Create a Spark DataFrame from the transformed data
.write.format("delta").mode("overwrite") # Write the DataFrame to Delta format, overwriting any existing data
.saveAsTable(f"{catalog}.{db}.m4_daily_train") # Save the table in the specified catalog and schema
)
# Print a confirmation message
print(f"Saved data to {catalog}.{db}.m4_daily_train")
# COMMAND ----------
# MAGIC %md
# MAGIC ## Monthly M4 data
# MAGIC In our example notebooks, we primarily use daily time series. However, if you want to experiment with monthly time series, use the `m4_monthly_train` table generated by the following command.
# COMMAND ----------
def create_m4_monthly():
# Load the M4 monthly dataset
y_df, _, _ = M4.load(directory=str(pathlib.Path.home()), group="Monthly")
# Create a list of unique IDs for the time series we want to sample
_ids = [f"M{i}" for i in range(1, n + 1)]
# Filter and transform the dataset based on the unique IDs
y_df = (
y_df.groupby("unique_id")
.filter(lambda x: x.unique_id.iloc[0] in _ids)
.groupby("unique_id")
.apply(transform_group_monthly)
.reset_index(drop=True)
)
return y_df
def transform_group_monthly(df):
unique_id = df.unique_id.iloc[0] # Get the unique ID of the current group
_cnt = 60 # Set the count for the number of months
_start = pd.Timestamp("2018-01-01") # Start date for the transformed data
_end = _start + pd.DateOffset(months=_cnt) # End date for the transformed data
date_idx = pd.date_range(start=_start, end=_end, freq="M", name="date") # Generate the date range for monthly data
_df = (
pd.DataFrame(data=[], index=date_idx) # Create an empty DataFrame with the date range
.reset_index()
.rename(columns={"index": "date"}) # Rename the index column to "date"
)
_df["unique_id"] = unique_id # Add the unique ID column
_df["y"] = df[:60].y.values # Add the target variable column, limited to 60 entries
return _df
(
spark.createDataFrame(create_m4_monthly()) # Create a Spark DataFrame from the transformed data
.write.format("delta").mode("overwrite") # Write the DataFrame to Delta format, overwriting any existing data
.saveAsTable(f"{catalog}.{db}.m4_monthly_train") # Save the table in the specified catalog and schema
)
# Print a confirmation message
print(f"Saved data to {catalog}.{db}.m4_monthly_train")
# COMMAND ----------
# MAGIC %md
# MAGIC © 2024 Databricks, Inc. All rights reserved.
# MAGIC
# MAGIC The sources in all notebooks in this directory and the sub-directories are provided subject to the Databricks License. All included or referenced third party libraries are subject to the licenses set forth below.
# MAGIC
# MAGIC | library | description | license | source |
# MAGIC |----------------------------------------|-------------------------|------------|-----------------------------------------------------|
# MAGIC | datasetsforecast | Datasets for Time series forecasting | MIT | https://pypi.org/project/datasetsforecast/