diff --git a/docs/source/user_guide/infilling.rst b/docs/source/user_guide/infilling.rst index 2f7b75d..8dc36a0 100644 --- a/docs/source/user_guide/infilling.rst +++ b/docs/source/user_guide/infilling.rst @@ -77,21 +77,33 @@ In more detail ============== The :meth:`~time_stream.TimeFrame.infill` method is the entry point for infilling your -timeseries data in **Time-Stream**. It delegates to well established methods from the `SciPy data science library -`_, combined with the time-integrity of your **TimeFrame**. +timeseries data in **Time-Stream**. There are various infill methods available; from using alternative data from +another source, to delegating to well established methods from the `SciPy data science library +`_. All methods are combined with the time-integrity +of your **TimeFrame**. + +Let's look at the method in more detail: + +.. automethod:: time_stream.TimeFrame.infill Infill methods -------------- -Choose how missing values are estimated by passing a method name as a string. Each method has its strengths, -depending on your data. +The ``infill_method`` parameter lets you choose how missing values are estimated by passing a method name as a string. +Each method has its strengths, depending on your data. The currently available methods are: + +Simple infilling techniques +~~~~~~~~~~~~~~~~~~~~~~~~~~~ +- ``"alt_data"`` - **infill using data from an alternative source.** + + Either another column in your TimeFrame, or data from a different DataFrame entirely. Polynomial interpolation ~~~~~~~~~~~~~~~~~~~~~~~~ - ``"linear"`` - **straight-line interpolation between neighbouring points.** - Simple and neutral; best for very short gaps (1–2 steps). + Simple and neutral; best for short gaps. - ``"quadratic"`` - **second-order polynomial curve.** @@ -103,7 +115,7 @@ Polynomial interpolation - ``"bspline"`` - **B-spline interpolation (configurable order).** - Flexible piecewise polynomials; user decides.* + Flexible piecewise polynomials; user decides. Shape-preserving methods ~~~~~~~~~~~~~~~~~~~~~~~~ @@ -123,25 +135,21 @@ Shape-preserving methods .. note:: - NaN values at the very beginning and very end of a timeseries will remain NaN; there is no pre- or post- data to - constrain the infilling method. - -Column selection ----------------- - -Specify which column to infill; only this column will be used by the infill function. - + For infill methods using interpolation techniques, NaN values at the very beginning and very end of a timeseries + will remain NaN; there is no pre- or post- data to constrain the infilling method. Column selection ---------------- -Specify which column to infill; only this column will be used by the infill function. +The ``column_name`` parameter lets you specify which column to infill; only this column will be used by the infill +function. Observation interval -------------------- -Specify an observation interval to restrict infilling to a **specific time window**. This is useful when: +The ``observation_interval`` parameter lets you specify an observation interval to restrict infilling +to a **specific time window**. This is useful when: - You only want to work with a subset of data (e.g. one hydrological year). - You want to fill recent gaps without touching the historical record. @@ -164,7 +172,7 @@ This will only attempt infilling **between January to Decemeber 2024**; gaps out Max gap size ------------ -Use the maximum gap size to prevent **over-eager interpolation**. Only gaps less than this +Use the ``max_gap_size`` parameter to prevent **over-eager interpolation**. Only gaps less than this (measured in consecutive missing **steps**) will be infilled. Example: @@ -183,11 +191,71 @@ Example: At 15-minute resolution, ``max_gap_size=2`` = 30 minutes; at daily resolution, ``max_gap_size=2`` = 2 days. -Visualisation of methods -======================== +Examples +======== + +Alternative data infilling +-------------------------- + +The ``"alt_data"`` infill method allows you to fill missing values in a column using data from an alternative source. + +You can specify the alternative data in two ways: + +1. **From a column within the same TimeFrame**: If the alternative data is already present as a column in your + current :class:`~time_stream.TimeFrame` object, you can directly reference it. +2. **From a separate DataFrame**: You can provide an entirely separate + Polars DataFrame containing the alternative data. + +In both cases, you can also apply a ``correction_factor`` to the alternative data before it's used for infilling. + +Infilling from a separate DataFrame +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Let's say you have a primary dataset with missing "flow" values, and a separate ``alt_df`` with "alt_data" that +can be used to infill these gaps. + +**Input:** + +.. tab-set:: + :class: outline padded-tabs + + .. tab-item:: Main Data + + .. jupyter-execute:: + :hide-code: + + import examples_infilling + ts = examples_infilling.alt_data_main() + + .. tab-item:: Alternative Data + + .. jupyter-execute:: + :hide-code: + + import examples_infilling + ts = examples_infilling.alt_data_alt() + +**Code:** + +.. literalinclude:: ../../../src/time_stream/examples/examples_infilling.py + :language: python + :start-after: [start_block_2] + :end-before: [end_block_2] + :dedent: + +**Output:** + +.. jupyter-execute:: + :hide-code: + + import examples_infilling + ts = examples_infilling.alt_data_infill() + +Visualisation of interpolation methods +====================================== -A quick visualisation of the results from the different infill methods is sometimes useful. However, bear in mind -that this is a very simplistic example and the correct method to use is dependent on your data. +A quick visualisation of the results from the different interpolation infill methods is sometimes useful. However, +bear in mind that this is a very simplistic example and the correct method to use is dependent on your data. You should do your research into which is most appropriate. .. jupyter-execute:: diff --git a/src/time_stream/examples/examples_infilling.py b/src/time_stream/examples/examples_infilling.py index 292604a..2d73f9c 100644 --- a/src/time_stream/examples/examples_infilling.py +++ b/src/time_stream/examples/examples_infilling.py @@ -61,6 +61,34 @@ def create_simple_time_series_with_gaps() -> ts.TimeFrame: return tf +def alt_data_main() -> pl.DataFrame: + with suppress_output(): + df = get_example_df(library="polars") + print(df) + return df + + +def alt_data_alt() -> pl.DataFrame: + with suppress_output(): + alt_df = get_example_df(library="polars", complete=True) + alt_df = alt_df.with_columns(pl.col("flow").mul(1.25).alias("alt_flow")).drop("flow") + print(alt_df) + return alt_df + + +def alt_data_infill() -> None: + with suppress_output(): + df = alt_data_main() + alt_df = alt_data_alt() + + tf = ts.TimeFrame(df, "time", resolution="PT15M", periodicity="PT15M") + + # [start_block_2] + tf_infill = tf.infill("alt_data", "flow", alt_df=alt_df, correction_factor=0.75, alt_data_column="alt_flow") + # [end_block_2] + print(tf_infill.df) + + def all_infills() -> pl.DataFrame: with suppress_output(): tf = create_simple_time_series_with_gaps() diff --git a/src/time_stream/examples/utils.py b/src/time_stream/examples/utils.py index 2b79163..92794e4 100644 --- a/src/time_stream/examples/utils.py +++ b/src/time_stream/examples/utils.py @@ -19,19 +19,22 @@ def suppress_output() -> Iterator: sys.stdout = original_stdout -def get_example_df(library: str = "polars") -> pd.DataFrame: +def get_example_df(library: str = "polars", complete: bool = False) -> pd.DataFrame: # Create sample data: 15-minute intervals from 2020-09-01 to 2023-11-01 with random flow data np.random.seed(31) date_range = pd.date_range(start="2020-09-01", end="2023-11-01", freq="15min") - flow_data = np.random.uniform(10, 100, len(date_range)) + np.sin(np.arange(len(date_range)) * 0.01) * 20 - flow_data[[1, 3, 4, 5, 6, -2, -3]] = np.nan + flow_data = np.random.uniform(90, 100, len(date_range)) + np.sin(np.arange(len(date_range)) * 0.01) * 20 # Create input dataframe df = pd.DataFrame({"time": date_range, "flow": flow_data}) - # Add some NaN values to simulate incomplete data - mask = np.random.random(len(df)) > 0.95 - df.loc[mask, "flow"] = np.nan + if not complete: + # Add some NaN values to simulate incomplete data + mask = np.random.random(len(df)) > 0.95 + df.loc[mask, "flow"] = np.nan + # Target some specific indexes so we can see them on examples + df.iloc[[1, 3, 4, 5, 6, -2, -3], df.columns.get_loc("flow")] = np.nan + if library == "polars": df = pl.DataFrame(df) else: diff --git a/src/time_stream/infill.py b/src/time_stream/infill.py index 5331ef5..35c9480 100644 --- a/src/time_stream/infill.py +++ b/src/time_stream/infill.py @@ -44,12 +44,13 @@ def _infilled_column_name(self, infill_column: str) -> str: return f"{infill_column}_{self.name}" @abstractmethod - def _fill(self, df: pl.DataFrame, infill_column: str) -> pl.DataFrame: + def _fill(self, df: pl.DataFrame, infill_column: str, ctx: InfillCtx) -> pl.DataFrame: """Return the Polars dataframe containing infilled data. Args: df: The DataFrame to infill. infill_column: The column to infill. + ctx: The infill context. Returns: pl.DataFrame with infilled values @@ -119,7 +120,7 @@ def execute(self) -> pl.DataFrame: return self.ctx.df # Apply the specific infill logic from the child class - df_infilled = self.infill_method._fill(df, self.column) + df_infilled = self.infill_method._fill(df, self.column, self.ctx) infilled_column = self.infill_method._infilled_column_name(self.column) # Limit the infilled data to where the infill mask is True @@ -215,7 +216,7 @@ def min_points_required(self) -> int: """Minimum number of data points required for this interpolation method.""" pass - def _fill(self, df: pl.DataFrame, infill_column: str) -> pl.DataFrame: + def _fill(self, df: pl.DataFrame, infill_column: str, ctx: InfillCtx) -> pl.DataFrame: """Apply scipy interpolation to fill missing values in the specified column. This method handles the common scipy interpolation workflow: @@ -229,6 +230,7 @@ def _fill(self, df: pl.DataFrame, infill_column: str) -> pl.DataFrame: Args: df: The DataFrame to infill. infill_column: The column to infill. + ctx: The infill context. Returns: pl.DataFrame with infilled values @@ -356,3 +358,63 @@ class PchipInterpolation(ScipyInterpolation): def _create_interpolator(self, x_valid: np.ndarray, y_valid: np.ndarray) -> Any: """Create scipy PCHIP interpolator.""" return PchipInterpolator(x_valid, y_valid, **self.scipy_kwargs) + + +@InfillMethod.register +class AltData(InfillMethod): + """Infill from an alternative data source, with optional correction factor.""" + + name = "alt_data" + + def __init__(self, alt_data_column: str, correction_factor: float = 1.0, alt_df: pl.DataFrame | None = None): + """Initialize the alternative data infill method. + + Args: + alt_data_column: The name of the column providing the alternative data. + correction_factor: An optional correction factor to apply to the alternative data. + alt_df: The DataFrame containing the alternative data. + """ + self.alt_data_column = alt_data_column + self.correction_factor = correction_factor + self.alt_df = alt_df + + def _fill(self, df: pl.DataFrame, infill_column: str, ctx: InfillCtx) -> pl.DataFrame: + """Fill missing values using data from the alternative column. + + Args: + df: The DataFrame to infill. + infill_column: The column to infill. + ctx: The infill context. + + Returns: + pl.DataFrame with infilled values. + """ + if self.alt_df is None: + check_columns_in_dataframe(df, [self.alt_data_column]) + alt_data_column_name = self.alt_data_column + else: + time_column_name = ctx.time_name + check_columns_in_dataframe(self.alt_df, [time_column_name, self.alt_data_column]) + alt_data_column_name = f"__ALT_DATA__{self.alt_data_column}" + alt_df = self.alt_df.select([time_column_name, self.alt_data_column]).rename( + {self.alt_data_column: alt_data_column_name} + ) + + df = df.join( + alt_df, + on=time_column_name, + how="left", + suffix="_alt", + ) + + infilled = df.with_columns( + pl.when(pl.col(infill_column).is_null()) + .then(pl.col(alt_data_column_name) * self.correction_factor) + .otherwise(pl.col(infill_column)) + .alias(self._infilled_column_name(infill_column)) + ) + + if self.alt_df is not None: + infilled = infilled.drop(alt_data_column_name) + + return infilled diff --git a/tests/time_stream/test_infill.py b/tests/time_stream/test_infill.py index 2713497..4d2772a 100644 --- a/tests/time_stream/test_infill.py +++ b/tests/time_stream/test_infill.py @@ -8,10 +8,16 @@ from parameterized import parameterized from polars.testing import assert_frame_equal, assert_series_equal -from time_stream import TimeFrame -from time_stream.exceptions import InfillInsufficientValuesError, RegistryKeyTypeError, UnknownRegistryKeyError +from time_stream import Period, TimeFrame +from time_stream.exceptions import ( + ColumnNotFoundError, + InfillInsufficientValuesError, + RegistryKeyTypeError, + UnknownRegistryKeyError, +) from time_stream.infill import ( AkimaInterpolation, + AltData, BSplineInterpolation, CubicInterpolation, InfillCtx, @@ -23,6 +29,9 @@ ) from time_stream.utils import gap_size_count +TIME_COLUMN = "timestamp" +PERIODICITY = Period.of_days(1) + # Data used through the tests LINEAR = pl.DataFrame({"values": [1.0, None, 3.0, None, 5.0]}) # Linear progression QUADRATIC = pl.DataFrame({"values": [0.0, None, 4.0, None, 16.0, None, 36.0]}) # Quadratic data: y = x^2 @@ -170,7 +179,8 @@ class TestLinearInterpolation(unittest.TestCase): ) def test_linear_interpolation_known_result(self, input_data: pl.DataFrame, expected_data: list) -> None: """Test linear interpolation with known data.""" - result = LinearInterpolation()._fill(input_data, "values") + ctx = InfillCtx(input_data, TIME_COLUMN, PERIODICITY) + result = LinearInterpolation()._fill(input_data, "values", ctx) expected = pl.Series("values_linear", expected_data) assert_series_equal(result["values_linear"], expected) @@ -182,12 +192,14 @@ def test_linear_interpolation_known_result(self, input_data: pl.DataFrame, expec ) def test_insufficient_data_raises_error(self, _: str, input_data: pl.DataFrame) -> None: """Test that insufficient data raises InfillInsufficientValuesError.""" + ctx = InfillCtx(input_data, TIME_COLUMN, PERIODICITY) with self.assertRaises(InfillInsufficientValuesError): - LinearInterpolation()._fill(input_data, "values") + LinearInterpolation()._fill(input_data, "values", ctx) def test_complete_data_unchanged(self) -> None: """Test that complete data is unchanged.""" - result = LinearInterpolation()._fill(COMPLETE, "values") + ctx = InfillCtx(COMPLETE, TIME_COLUMN, PERIODICITY) + result = LinearInterpolation()._fill(COMPLETE, "values", ctx) expected = pl.Series("values_linear", COMPLETE) assert_series_equal(result["values_linear"], expected) @@ -201,7 +213,8 @@ class TestQuadraticInterpolation(unittest.TestCase): ) def test_quadratic_interpolation_known_result(self, input_data: pl.DataFrame, expected_data: list) -> None: """Test quadratic interpolation with known data.""" - result = QuadraticInterpolation()._fill(input_data, "values") + ctx = InfillCtx(input_data, TIME_COLUMN, PERIODICITY) + result = QuadraticInterpolation()._fill(input_data, "values", ctx) expected = pl.Series("values_quadratic", expected_data) assert_series_equal(result["values_quadratic"], expected) @@ -209,12 +222,14 @@ def test_quadratic_interpolation_known_result(self, input_data: pl.DataFrame, ex @parameterized.expand([("1 data points", INSUFFICIENT_DATA), ("0 data points", ALL_MISSING)]) def test_insufficient_data_raises_error(self, _: str, input_data: pl.DataFrame) -> None: """Test that insufficient data raises InfillInsufficientValuesError.""" + ctx = InfillCtx(input_data, TIME_COLUMN, PERIODICITY) with self.assertRaises(InfillInsufficientValuesError): - QuadraticInterpolation()._fill(input_data, "values") + QuadraticInterpolation()._fill(input_data, "values", ctx) def test_complete_data_unchanged(self) -> None: """Test that complete data is unchanged.""" - result = QuadraticInterpolation()._fill(COMPLETE, "values") + ctx = InfillCtx(COMPLETE, TIME_COLUMN, PERIODICITY) + result = QuadraticInterpolation()._fill(COMPLETE, "values", ctx) expected = pl.Series("values_quadratic", COMPLETE) assert_series_equal(result["values_quadratic"], expected) @@ -227,7 +242,8 @@ class TestCubicInterpolation(unittest.TestCase): ) def test_cubic_interpolation_known_result(self, input_data: pl.DataFrame, expected_data: list) -> None: """Test cubic interpolation with known data.""" - result = CubicInterpolation()._fill(input_data, "values") + ctx = InfillCtx(input_data, TIME_COLUMN, PERIODICITY) + result = CubicInterpolation()._fill(input_data, "values", ctx) expected = pl.Series("values_cubic", expected_data) assert_series_equal(result["values_cubic"], expected) @@ -237,12 +253,14 @@ def test_cubic_interpolation_known_result(self, input_data: pl.DataFrame, expect ) def test_insufficient_data_raises_error(self, _: str, input_data: pl.DataFrame) -> None: """Test that insufficient data raises InfillInsufficientValuesError.""" + ctx = InfillCtx(input_data, TIME_COLUMN, PERIODICITY) with self.assertRaises(InfillInsufficientValuesError): - CubicInterpolation()._fill(input_data, "values") + CubicInterpolation()._fill(input_data, "values", ctx) def test_complete_data_unchanged(self) -> None: """Test that complete data is unchanged.""" - result = CubicInterpolation()._fill(COMPLETE, "values") + ctx = InfillCtx(COMPLETE, TIME_COLUMN, PERIODICITY) + result = CubicInterpolation()._fill(COMPLETE, "values", ctx) expected = pl.Series("values_cubic", COMPLETE) assert_series_equal(result["values_cubic"], expected) @@ -264,7 +282,8 @@ def test_initialization(self) -> None: def test_akima_interpolation_with_sufficient_data(self) -> None: """Test akima interpolation works when there is sufficient data (at least 5 points).""" - result = AkimaInterpolation()._fill(CUBIC, "values") + ctx = InfillCtx(CUBIC, TIME_COLUMN, PERIODICITY) + result = AkimaInterpolation()._fill(CUBIC, "values", ctx) self.assertIn("values_akima", result.columns) @parameterized.expand( @@ -277,12 +296,14 @@ def test_akima_interpolation_with_sufficient_data(self) -> None: ) def test_insufficient_data_raises_error(self, _: str, input_data: pl.DataFrame) -> None: """Test that insufficient data raises InfillInsufficientValuesError.""" + ctx = InfillCtx(input_data, TIME_COLUMN, PERIODICITY) with self.assertRaises(InfillInsufficientValuesError): - AkimaInterpolation()._fill(input_data, "values") + AkimaInterpolation()._fill(input_data, "values", ctx) def test_complete_data_unchanged(self) -> None: """Test that complete data is unchanged.""" - result = AkimaInterpolation()._fill(COMPLETE, "values") + ctx = InfillCtx(COMPLETE, TIME_COLUMN, PERIODICITY) + result = AkimaInterpolation()._fill(COMPLETE, "values", ctx) expected = pl.Series("values_akima", COMPLETE) assert_series_equal(result["values_akima"], expected) @@ -304,14 +325,16 @@ def test_initialization(self) -> None: def test_pchip_interpolation_with_sufficient_data(self) -> None: """Test akima interpolation works when there is sufficient data (at least 2 points).""" - result = PchipInterpolation()._fill(LINEAR, "values") + ctx = InfillCtx(LINEAR, TIME_COLUMN, PERIODICITY) + result = PchipInterpolation()._fill(LINEAR, "values", ctx) self.assertIn("values_pchip", result.columns) @parameterized.expand([("1 data points", INSUFFICIENT_DATA), ("0 data points", ALL_MISSING)]) def test_insufficient_data_raises_error(self, _: str, input_data: pl.DataFrame) -> None: """Test that insufficient data raises InfillInsufficientValuesError.""" + ctx = InfillCtx(input_data, TIME_COLUMN, PERIODICITY) with self.assertRaises(InfillInsufficientValuesError): - PchipInterpolation()._fill(input_data, "values") + PchipInterpolation()._fill(input_data, "values", ctx) @parameterized.expand( [ @@ -322,7 +345,8 @@ def test_insufficient_data_raises_error(self, _: str, input_data: pl.DataFrame) ) def test_pchip_monotonic_preservation(self, input_data: pl.DataFrame) -> None: """Part of the pchip behaviour is that it should preserve local monotonicity if the input data is monotonic.""" - result = PchipInterpolation()._fill(input_data, "values") + ctx = InfillCtx(input_data, TIME_COLUMN, PERIODICITY) + result = PchipInterpolation()._fill(input_data, "values", ctx) interpolated = result["values_pchip"].to_numpy() # Check that result is monotonically increasing @@ -330,7 +354,8 @@ def test_pchip_monotonic_preservation(self, input_data: pl.DataFrame) -> None: def test_complete_data_unchanged(self) -> None: """Test that complete data is unchanged.""" - result = PchipInterpolation()._fill(COMPLETE, "values") + ctx = InfillCtx(COMPLETE, TIME_COLUMN, PERIODICITY) + result = PchipInterpolation()._fill(COMPLETE, "values", ctx) expected = pl.Series("values_pchip", COMPLETE) assert_series_equal(result["values_pchip"], expected) @@ -419,3 +444,111 @@ def test_apply_edge_cases( ) expected = self.create_tf(pl.DataFrame({"values": expected})) assert_frame_equal(result, expected.df, check_column_order=False) + + +class TestAltData(unittest.TestCase): + def setUp(self) -> None: + self.df = pl.DataFrame( + { + "timestamp": [ + datetime(2025, 1, 1), + datetime(2025, 1, 2), + datetime(2025, 1, 3), + datetime(2025, 1, 4), + datetime(2025, 1, 5), + ], + "values": [1.0, None, 3.0, None, 5.0], + "alt_values": [10.0, 20.0, 30.0, 40.0, 50.0], + "alt_with_missing": [10.0, None, 30.0, 40.0, None], + } + ) + self.tf = TimeFrame(self.df, "timestamp", "P1D") + + def test_alt_data_infill(self) -> None: + """Test basic infilling from an alternative column.""" + infiller = AltData(alt_data_column="alt_values") + result_df = infiller.apply(self.tf.df, self.tf.time_name, self.tf.periodicity, "values") + expected_df = self.df.with_columns(pl.Series("values", [1.0, 20.0, 3.0, 40.0, 5.0])) + assert_frame_equal(result_df, expected_df, check_column_order=False) + + def test_alt_data_infill_with_correction(self) -> None: + """Test infilling with a correction factor.""" + infiller = AltData(alt_data_column="alt_values", correction_factor=0.1) + result_df = infiller.apply(self.tf.df, self.tf.time_name, self.tf.periodicity, "values") + expected_df = self.df.with_columns(pl.Series("values", [1.0, 2.0, 3.0, 4.0, 5.0])) + assert_frame_equal(result_df, expected_df, check_column_order=False) + + def test_alt_data_infill_no_missing_data(self) -> None: + """Test that nothing happens when there is no missing data.""" + df_complete = self.df.with_columns(pl.Series("values", [1.0, 2.0, 3.0, 4.0, 5.0])) + tf_complete = TimeFrame(df_complete, "timestamp", "P1D") + infiller = AltData(alt_data_column="alt_values") + result_df = infiller.apply(tf_complete.df, tf_complete.time_name, tf_complete.periodicity, "values") + assert_frame_equal(result_df, tf_complete.df, check_column_order=False) + + def test_alt_data_infill_missing_alt_data(self) -> None: + """Test that missing data in the alternative column is not used for infilling.""" + infiller = AltData(alt_data_column="alt_with_missing") + result_df = infiller.apply(self.tf.df, self.tf.time_name, self.tf.periodicity, "values") + expected_df = self.df.with_columns(pl.Series("values", [1.0, None, 3.0, 40.0, 5.0])) + assert_frame_equal(result_df, expected_df, check_column_order=False) + + def test_alt_data_infill_missing_alt_data_column_column(self) -> None: + """Test that an error is raised if the alt_data_column column is missing.""" + infiller = AltData(alt_data_column="non_existent_column") + with self.assertRaises(ColumnNotFoundError): + infiller.apply(self.tf.df, self.tf.time_name, self.tf.periodicity, "values") + + def test_alt_data_infill_restricting_date_range(self) -> None: + """Test that only data in the observation_interval is infilled.""" + infiller = AltData(alt_data_column="alt_values") + result_df = infiller.apply( + self.tf.df, + self.tf.time_name, + self.tf.periodicity, + "values", + observation_interval=(datetime(2025, 1, 1), datetime(2025, 1, 2)), + ) + expected_df = self.df.with_columns(pl.Series("values", [1.0, 20.0, 3.0, None, 5.0])) + assert_frame_equal(result_df, expected_df, check_column_order=False) + + def test_alt_data_infill_with_alt_data_provided(self) -> None: + """Test infilling from a provided alternative DataFrame.""" + alt_df = pl.DataFrame( + { + "timestamp": self.df["timestamp"], + "alt_values_df": [11.0, 22.0, 33.0, 44.0, 55.0], + } + ) + infiller = AltData(alt_data_column="alt_values_df", alt_df=alt_df) + result_df = infiller.apply(self.tf.df, self.tf.time_name, self.tf.periodicity, "values") + expected_df = self.df.with_columns(pl.Series("values", [1.0, 22.0, 3.0, 44.0, 5.0])) + assert_frame_equal(result_df, expected_df, check_column_order=False) + + def test_alt_data_infill_with_alt_data_missing_time_column(self) -> None: + """Test error when provided alt_data is missing the time column.""" + alt_df = pl.DataFrame({"alt_values_df": [11.0, 22.0, 33.0, 44.0, 55.0]}) + infiller = AltData(alt_data_column="alt_values", alt_df=alt_df) + with self.assertRaises(ColumnNotFoundError): + infiller.apply(self.tf.df, self.tf.time_name, self.tf.periodicity, "values") + + def test_alt_data_infill_with_alt_data_missing_data_column(self) -> None: + """Test error when provided alt_data is missing the data column.""" + alt_df = pl.DataFrame({"time": self.df["timestamp"]}) + infiller = AltData(alt_data_column="non_existent_column", alt_df=alt_df) + with self.assertRaises(ColumnNotFoundError): + infiller.apply(self.tf.df, self.tf.time_name, self.tf.periodicity, "values") + + def test_alt_data_infill_with_alt_data_and_column_in_main_df(self) -> None: + """Test that alt_data is prioritized when column name exists in main df.""" + alt_df = pl.DataFrame( + { + "timestamp": self.df["timestamp"], + "values": [11.0, 22.0, 33.0, 44.0, 55.0], + } + ) + infiller = AltData(alt_data_column="values", alt_df=alt_df) + + result_df = infiller.apply(self.tf.df, self.tf.time_name, self.tf.periodicity, "values") + expected_df = self.df.with_columns(pl.Series("values", [1.0, 22.0, 3.0, 44.0, 5.0])) + assert_frame_equal(result_df, expected_df, check_column_order=False)