diff --git a/docs/source/user_guide/aggregation.rst b/docs/source/user_guide/aggregation.rst index 5150b4b..02ab845 100644 --- a/docs/source/user_guide/aggregation.rst +++ b/docs/source/user_guide/aggregation.rst @@ -177,6 +177,12 @@ Choose how values inside each window are summarised. Pass a **string** correspon Common in hydrology for annual maxima (AMAX) or flood frequency analysis. +- ``"percentile"`` - **The 'nth' percentile value for the period.** + + Useful for capturing extremes within a given period, such as the 5th or 95th percentile of streamflow. + The percentile value to be calculated is provided as an integer parameter (p) from 0 to 100 (inclusive). + + Column selection ---------------- diff --git a/src/time_stream/aggregation.py b/src/time_stream/aggregation.py index 465ba7d..b95a853 100644 --- a/src/time_stream/aggregation.py +++ b/src/time_stream/aggregation.py @@ -72,7 +72,12 @@ def apply( """ ctx = AggregationCtx(df, time_name, time_anchor, periodicity) pipeline = AggregationPipeline( - self, ctx, aggregation_period, columns, aggregation_time_anchor, missing_criteria + self, + ctx, + aggregation_period, + columns, + aggregation_time_anchor, + missing_criteria, ) return pipeline.execute() @@ -364,3 +369,36 @@ def expr(self, ctx: AggregationCtx, columns: list[str]) -> list[pl.Expr]: ] ) return expressions + + +@AggregationFunction.register +class Percentile(AggregationFunction): + """An aggregation class to find the nth percentile of values within each aggregation period.""" + + name = "percentile" + + def __init__(self, p: int): + """ + Initialise Percentile aggregation: + + Args: + p: The integer percentile value to apply. + **kwargs: Any additional parameters to be passed through. + + """ + super().__init__() + + self.p = p + + def expr(self, ctx: AggregationCtx, columns: list[str]) -> list[pl.Expr]: + """Return the 'Polars' expression for calculating the percentile""" + + # If the percentile value is between 0 -100 divide it by 100 to convert it to the quantile equivalent. + if not self.p.is_integer() or not (0 <= self.p <= 100): + raise ValueError("The percentile value must be provided as an integer value from 0 to 100") + + quantile = self.p / 100 + + expressions = [(pl.col(col).quantile(quantile).alias(f"{self.name}_{col}")) for col in columns] + + return expressions diff --git a/src/time_stream/base.py b/src/time_stream/base.py index 9a7c421..8a9dc81 100644 --- a/src/time_stream/base.py +++ b/src/time_stream/base.py @@ -509,6 +509,7 @@ def aggregate( columns: str | list[str] | None = None, missing_criteria: tuple[str, float | int] | None = None, aggregation_time_anchor: TimeAnchor | None = None, + **kwargs, ) -> Self: """Apply an aggregation function to a column in this TimeFrame, check the aggregation satisfies user requirements and return a new derived TimeFrame containing the aggregated data. @@ -519,12 +520,13 @@ def aggregate( columns: The column(s) containing the data to be aggregated. If omitted, will use all data columns. missing_criteria: How the aggregation handles missing data aggregation_time_anchor: The time anchor for the aggregation result. + **kwargs: Parameters specific to the aggregation function. Returns: A TimeFrame containing the aggregated data. """ # Get the aggregation function instance and run the apply method - agg_func = AggregationFunction.get(aggregation_function) + agg_func = AggregationFunction.get(aggregation_function, **kwargs) aggregation_period = configure_period_object(aggregation_period) aggregation_time_anchor = TimeAnchor(aggregation_time_anchor) if aggregation_time_anchor else self.time_anchor diff --git a/tests/time_stream/test_aggregation.py b/tests/time_stream/test_aggregation.py index 5396c19..173dd2d 100644 --- a/tests/time_stream/test_aggregation.py +++ b/tests/time_stream/test_aggregation.py @@ -15,6 +15,7 @@ Mean, MeanSum, Min, + Percentile, Sum, ) from time_stream.base import TimeFrame @@ -274,7 +275,7 @@ class TestSimpleAggregations: no missing data""" @pytest.mark.parametrize( - "input_tf,aggregator,target_period,column,timestamps,counts,values,timestamps_of", + "input_tf,aggregator,target_period,column,timestamps,counts,values,timestamps_of,kwargs", [ ( TS_PT1H_2DAYS, @@ -285,6 +286,7 @@ class TestSimpleAggregations: [24, 24], {"value": [11.5, 35.5]}, None, + {}, ), ( TS_PT1H_2DAYS, @@ -295,6 +297,7 @@ class TestSimpleAggregations: [24, 24], {"value": [23, 47]}, [datetime(2025, 1, 1, 23), datetime(2025, 1, 2, 23)], + {}, ), ( TS_PT1H_2DAYS, @@ -305,6 +308,7 @@ class TestSimpleAggregations: [24, 24], {"value": [0, 24]}, [datetime(2025, 1, 1), datetime(2025, 1, 2)], + {}, ), ( TS_PT1H_2DAYS, @@ -315,6 +319,7 @@ class TestSimpleAggregations: [24, 24], {"value": [276, 852]}, None, + {}, ), ( TS_PT1H_2DAYS, @@ -325,6 +330,18 @@ class TestSimpleAggregations: [24, 24], {"value": [276, 852]}, None, + {}, + ), + ( + TS_PT1H_2DAYS, + Percentile, + P1D, + "value", + [datetime(2025, 1, 1), datetime(2025, 1, 2)], + [24, 24], + {"value": [22, 46]}, + None, + {"p": 95}, ), ], ids=[ @@ -333,6 +350,7 @@ class TestSimpleAggregations: "hourly to daily min", "hourly to daily mean_sum", "hourly to daily sum", + "hourly_to_daily_95_percentile", ], ) def test_microsecond_to_microsecond( @@ -345,11 +363,12 @@ def test_microsecond_to_microsecond( counts: list, values: dict, timestamps_of: list | None, + kwargs: dict[str, Any] | None, ) -> None: """Test aggregations of microsecond-based (i.e., 1 day or less) resolution data, to another microsecond-based resolution.""" expected_df = generate_expected_df(timestamps, aggregator, column, values, counts, counts, timestamps_of) - result = aggregator().apply( + result = aggregator(**kwargs).apply( input_tf.df, input_tf.time_name, input_tf.time_anchor, @@ -361,7 +380,7 @@ def test_microsecond_to_microsecond( assert_frame_equal(result, expected_df, check_dtype=False, check_column_order=False) @pytest.mark.parametrize( - "input_tf,aggregator,target_period,column,timestamps,counts,values,timestamps_of", + "input_tf,aggregator,target_period,column,timestamps,counts,values,timestamps_of,kwargs", [ ( TS_PT1H_2MONTH, @@ -372,6 +391,7 @@ def test_microsecond_to_microsecond( [744, 672], {"value": [371.5, 1079.5]}, None, + {}, ), ( TS_PT1H_2MONTH, @@ -382,6 +402,7 @@ def test_microsecond_to_microsecond( [744, 672], {"value": [743, 1415]}, [datetime(2025, 1, 31, 23), datetime(2025, 2, 28, 23)], + {}, ), ( TS_PT1H_2MONTH, @@ -392,6 +413,7 @@ def test_microsecond_to_microsecond( [744, 672], {"value": [0, 744]}, [datetime(2025, 1, 1), datetime(2025, 2, 1)], + {}, ), ( TS_PT1H_2MONTH, @@ -402,6 +424,7 @@ def test_microsecond_to_microsecond( [744, 672], {"value": [276396, 725424]}, None, + {}, ), ( TS_PT1H_2MONTH, @@ -412,6 +435,18 @@ def test_microsecond_to_microsecond( [744, 672], {"value": [276396, 725424]}, None, + {}, + ), + ( + TS_PT1H_2MONTH, + Percentile, + P1M, + "value", + [datetime(2025, 1, 1), datetime(2025, 2, 1)], + [744, 672], + {"value": [557, 1247]}, + None, + {"p": 75}, ), ], ids=[ @@ -420,6 +455,7 @@ def test_microsecond_to_microsecond( "hourly to monthly min", "hourly to monthly mean_sum", "hourly to monthly sum", + "hourly_to_monthly_75_percentile", ], ) def test_microsecond_to_month( @@ -432,10 +468,11 @@ def test_microsecond_to_month( counts: list, values: dict, timestamps_of: list | None, + kwargs: dict[str, Any], ) -> None: """Test aggregations of microsecond-based (i.e., 1-day or less) resolution data, to a month-based resolution.""" expected_df = generate_expected_df(timestamps, aggregator, column, values, counts, counts, timestamps_of) - result = aggregator().apply( + result = aggregator(**kwargs).apply( input_tf.df, input_tf.time_name, input_tf.time_anchor, @@ -447,7 +484,7 @@ def test_microsecond_to_month( assert_frame_equal(result, expected_df, check_dtype=False, check_column_order=False) @pytest.mark.parametrize( - "input_tf,aggregator,target_period,column,timestamps,counts,values,timestamps_of", + "input_tf,aggregator,target_period,column,timestamps,counts,values,timestamps_of,kwargs", [ ( TS_P1M_2YEARS, @@ -458,6 +495,7 @@ def test_microsecond_to_month( [12, 12], {"value": [5.5, 17.5]}, None, + {}, ), ( TS_P1M_2YEARS, @@ -468,6 +506,7 @@ def test_microsecond_to_month( [12, 12], {"value": [11, 23]}, [datetime(2025, 12, 1), datetime(2026, 12, 1)], + {}, ), ( TS_P1M_2YEARS, @@ -478,6 +517,7 @@ def test_microsecond_to_month( [12, 12], {"value": [0, 12]}, [datetime(2025, 1, 1), datetime(2026, 1, 1)], + {}, ), ( TS_P1M_2YEARS, @@ -488,6 +528,7 @@ def test_microsecond_to_month( [12, 12], {"value": [66, 210]}, None, + {}, ), ( TS_P1M_2YEARS, @@ -498,6 +539,18 @@ def test_microsecond_to_month( [12, 12], {"value": [66, 210]}, None, + {}, + ), + ( + TS_P1M_2YEARS, + Percentile, + P1Y, + "value", + [datetime(2025, 1, 1), datetime(2026, 1, 1)], + [12, 12], + {"value": [3, 15]}, + None, + {"p": 25}, ), ], ids=[ @@ -506,6 +559,7 @@ def test_microsecond_to_month( "monthly to yearly min", "monthly to yearly mean_sum", "monthly to yearly sum", + "monthly_to_yearly_25_percentile", ], ) def test_month_to_month( @@ -518,10 +572,11 @@ def test_month_to_month( counts: list, values: dict, timestamps_of: list | None, + kwargs: dict[str, Any] | None, ) -> None: """Test aggregations of month-based resolution data, to a month-based resolution.""" expected_df = generate_expected_df(timestamps, aggregator, column, values, counts, counts, timestamps_of) - result = aggregator().apply( + result = aggregator(**kwargs).apply( input_tf.df, input_tf.time_name, input_tf.time_anchor, @@ -533,7 +588,7 @@ def test_month_to_month( assert_frame_equal(result, expected_df, check_dtype=False, check_column_order=False) @pytest.mark.parametrize( - "input_tf,aggregator,target_period,column,timestamps,counts,values,timestamps_of", + "input_tf,aggregator,target_period,column,timestamps,counts,values,timestamps_of,kwargs", [ ( TS_PT1H_2DAYS, @@ -544,6 +599,7 @@ def test_month_to_month( [24, 24], {"value": [11.5, 35.5], "value_plus1": [12.5, 36.5], "value_times2": [23, 71]}, None, + {}, ), ( TS_PT1H_2DAYS, @@ -554,6 +610,7 @@ def test_month_to_month( [24, 24], {"value": [23, 47], "value_plus1": [24, 48], "value_times2": [46, 94]}, [datetime(2025, 1, 1, 23), datetime(2025, 1, 2, 23)], + {}, ), ( TS_PT1H_2DAYS, @@ -564,6 +621,7 @@ def test_month_to_month( [24, 24], {"value": [0, 24], "value_plus1": [1, 25], "value_times2": [0, 48]}, [datetime(2025, 1, 1), datetime(2025, 1, 2)], + {}, ), ( TS_PT1H_2DAYS, @@ -574,6 +632,7 @@ def test_month_to_month( [24, 24], {"value": [276, 852], "value_plus1": [300, 876], "value_times2": [552, 1704]}, None, + {}, ), ( TS_PT1H_2DAYS, @@ -584,6 +643,18 @@ def test_month_to_month( [24, 24], {"value": [276, 852], "value_plus1": [300, 876], "value_times2": [552, 1704]}, None, + {}, + ), + ( + TS_PT1H_2DAYS, + Percentile, + P1D, + ["value", "value_plus1", "value_times2"], + [datetime(2025, 1, 1), datetime(2025, 1, 2)], + [24, 24], + {"value": [12, 36], "value_plus1": [13, 37], "value_times2": [24, 72]}, + None, + {"p": 50}, ), ], ids=[ @@ -592,6 +663,7 @@ def test_month_to_month( "mult column min", "mult column mean_sum", "mult column sum", + "multi_column_50_percentile", ], ) def test_multi_column( @@ -604,9 +676,10 @@ def test_multi_column( counts: list, values: dict, timestamps_of: list | None, + kwargs: dict[str, Any], ) -> None: expected_df = generate_expected_df(timestamps, aggregator, column, values, counts, counts, timestamps_of) - result = aggregator().apply( + result = aggregator(**kwargs).apply( input_tf.df, input_tf.time_name, input_tf.time_anchor, @@ -1275,3 +1348,53 @@ def test_with_no_metadata(self) -> None: result = tf.aggregate(Period.of_months(1), "mean", "value") assert result.metadata == {} + + +class TestPercentileAggregation: + @pytest.mark.parametrize( + "percentile,expected_values", + [ + (0, {"value": [0, 24]}), + (1, {"value": [0, 24]}), + (5, {"value": [1, 25]}), + (25, {"value": [6, 30]}), + (50, {"value": [12, 36]}), + (75, {"value": [17, 41]}), + (95, {"value": [22, 46]}), + (100, {"value": [23, 47]}), + ], + ) + def test_percentile_aggregation(self, percentile: int, expected_values: list[int]) -> None: + input_tf = TS_PT1H_2DAYS + column = "value" + timestamps = [datetime(2025, 1, 1), datetime(2025, 1, 2)] + counts = [24, 24] + + expected_df = generate_expected_df(timestamps, Percentile, column, expected_values, counts, counts, None) + result = Percentile(p=percentile).apply( + df=input_tf.df, + time_name=input_tf.time_name, + time_anchor=input_tf.time_anchor, + periodicity=input_tf.periodicity, + aggregation_period=P1D, + columns="value", + aggregation_time_anchor=input_tf.time_anchor, + ) + assert_frame_equal(result, expected_df, check_dtype=False, check_column_order=False) + + @pytest.mark.parametrize("percentile", [0.000000001, 0.999999, 101, 10000, 1.1, -1, -0.000000000001]) + def test_invalid_percentile(self, percentile: float) -> None: + input_tf = TS_PT1H_2DAYS + + expected_error = "The percentile value must be provided as an integer value from 0 to 100" + + with pytest.raises(ValueError, match=expected_error): + Percentile(p=percentile).apply( + df=input_tf.df, + time_name=input_tf.time_name, + time_anchor=input_tf.time_anchor, + periodicity=input_tf.periodicity, + aggregation_period=P1D, + columns="value", + aggregation_time_anchor=input_tf.time_anchor, + ) diff --git a/tests/time_stream/test_base.py b/tests/time_stream/test_base.py index 850ea31..81b02df 100644 --- a/tests/time_stream/test_base.py +++ b/tests/time_stream/test_base.py @@ -6,6 +6,7 @@ import pytest from polars.testing import assert_frame_equal, assert_frame_not_equal, assert_series_equal +from time_stream.aggregation import Percentile from time_stream.base import TimeFrame from time_stream.bitwise import BitwiseFlag from time_stream.exceptions import ColumnNotFoundError, MetadataError @@ -717,3 +718,32 @@ def test_different_column_metadata(self) -> None: def test_different_object(self, non_tf: Any) -> None: """Test that comparing against a non TimeSeries objects are not equal.""" assert self.tf_original != non_tf + + +class TestAggregate: + def test_aggregate_periodicity(self) -> None: + period = Period.of_hours(1) + length = 48 + df = pl.DataFrame( + { + "timestamp": [period.datetime(period.ordinal(datetime(2025, 1, 1)) + i) for i in range(length)], + "value": list(range(length)), + } + ) + tf = TimeFrame(df=df, time_name="timestamp", resolution=period, periodicity=period) + + expected_df = pl.DataFrame( + { + "timestamp": [datetime(2025, 1, 1, 0, 0, 0), datetime(2025, 1, 2, 0, 0, 0)], + "percentile_value": [22, 46], + "count_value": [24, 24], + "expected_count_timestamp": [24, 24], + "valid_value": [True, True], + } + ) + + aggregated_tf = tf.aggregate( + aggregation_period=Period.of_days(1), aggregation_function=Percentile, columns="value", p=95 + ) + + assert_frame_equal(aggregated_tf.df, expected_df, check_dtypes=False)