Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/source/user_guide/aggregation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ Choose how values inside each window are summarised. Pass a **string** correspon

Common in hydrology for annual maxima (AMAX) or flood frequency analysis.

- ``"percentile"`` - **The 'nth' percentile value for the period.**

Useful for capturing extremes within a given period, such as the 5th or 95th percentile of streamflow.
The percentile value to be calculated is provided as an integer parameter (p) from 0 to 100 (inclusive).


Column selection
----------------

Expand Down
40 changes: 39 additions & 1 deletion src/time_stream/aggregation.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@ def apply(
"""
ctx = AggregationCtx(df, time_name, time_anchor, periodicity)
pipeline = AggregationPipeline(
self, ctx, aggregation_period, columns, aggregation_time_anchor, missing_criteria
self,
ctx,
aggregation_period,
columns,
aggregation_time_anchor,
missing_criteria,
)
return pipeline.execute()

Expand Down Expand Up @@ -364,3 +369,36 @@ def expr(self, ctx: AggregationCtx, columns: list[str]) -> list[pl.Expr]:
]
)
return expressions


@AggregationFunction.register
class Percentile(AggregationFunction):
"""An aggregation class to find the nth percentile of values within each aggregation period."""

name = "percentile"

def __init__(self, p: int):
"""
Initialise Percentile aggregation:

Args:
p: The integer percentile value to apply.
**kwargs: Any additional parameters to be passed through.

"""
super().__init__()

self.p = p

def expr(self, ctx: AggregationCtx, columns: list[str]) -> list[pl.Expr]:
"""Return the 'Polars' expression for calculating the percentile"""

# If the percentile value is between 0 -100 divide it by 100 to convert it to the quantile equivalent.
if not self.p.is_integer() or not (0 <= self.p <= 100):
raise ValueError("The percentile value must be provided as an integer value from 0 to 100")

quantile = self.p / 100

expressions = [(pl.col(col).quantile(quantile).alias(f"{self.name}_{col}")) for col in columns]

return expressions
4 changes: 3 additions & 1 deletion src/time_stream/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ def aggregate(
columns: str | list[str] | None = None,
missing_criteria: tuple[str, float | int] | None = None,
aggregation_time_anchor: TimeAnchor | None = None,
**kwargs,
) -> Self:
"""Apply an aggregation function to a column in this TimeFrame, check the aggregation satisfies user
requirements and return a new derived TimeFrame containing the aggregated data.
Expand All @@ -519,12 +520,13 @@ def aggregate(
columns: The column(s) containing the data to be aggregated. If omitted, will use all data columns.
missing_criteria: How the aggregation handles missing data
aggregation_time_anchor: The time anchor for the aggregation result.
**kwargs: Parameters specific to the aggregation function.

Returns:
A TimeFrame containing the aggregated data.
"""
# Get the aggregation function instance and run the apply method
agg_func = AggregationFunction.get(aggregation_function)
agg_func = AggregationFunction.get(aggregation_function, **kwargs)
aggregation_period = configure_period_object(aggregation_period)
aggregation_time_anchor = TimeAnchor(aggregation_time_anchor) if aggregation_time_anchor else self.time_anchor

Expand Down
Loading