Skip to content
Draft
68 changes: 0 additions & 68 deletions docs/windowing.md
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,6 @@ if __name__ == '__main__':


### Early window expiration with triggers
!!! info New in v3.24.0

To expire windows before their natural expiration time based on custom conditions, you can pass `before_update` or `after_update` callbacks to `.tumbling_window()` and `.hopping_window()` methods.

Expand Down Expand Up @@ -730,73 +729,6 @@ Also, specifying a grace period using `grace_ms` will increase the latency, beca

You can use `final()` mode when some latency is allowed, but the emitted results must be complete and unique.

## Closing strategies

By default, windows use the **key** closing strategy.
In this strategy, messages advance time and close only windows with the **same** message key.

If some message keys appear irregularly in the stream, the latest windows can remain unprocessed until the message with the same key is received.

```python
from datetime import timedelta
from quixstreams import Application
from quixstreams.dataframe.windows import Sum

app = Application(...)
sdf = app.dataframe(...)

# Calculate a sum of values over a window of 10 seconds
# and use .final() to emit results only when the window is complete
sdf = sdf.tumbling_window(timedelta(seconds=10)).agg(value=Sum()).final(closing_strategy="key")

# Details:
# -> Timestamp=100, Key="A", value=1 -> emit nothing (the window is not closed yet)
# -> Timestamp=101, Key="B", value=2 -> emit nothing (the window is not closed yet)
# -> Timestamp=105, Key="C", value=3 -> emit nothing (the window is not closed yet)
# -> Timestamp=10100, Key="B", value=2 -> emit one message with key "B" and value {"start": 0, "end": 10000, "value": 2}, the time has progressed beyond the window end for the "B" key only.
# -> Timestamp=8000, Key="A", value=1 -> emit nothing (the window is not closed yet)
# -> Timestamp=10001, Key="A", value=1 -> emit one message with key "A" and value {"start": 0, "end": 10000, "value": 2}, the time has progressed beyond the window end for the "A" key.

# Results:
# (key="B", value={"start": 0, "end": 10000, "value": 2})
# (key="A", value={"start": 0, "end": 10000, "value": 2})
# No message for key "C" as the window is never closed since no messages with key "C" and a timestamp later than 10000 was received
```

An alternative is to use the **partition** closing strategy.
In this strategy, messages advance time and close windows for the whole partition to which this key belongs.

If messages aren't ordered accross keys some message can be skipped if the windows are already closed.

```python
from datetime import timedelta
from quixstreams import Application
from quixstreams.dataframe.windows import Sum

app = Application(...)
sdf = app.dataframe(...)

# Calculate a sum of values over a window of 10 seconds
# and use .final() to emit results only when the window is complete
sdf = sdf.tumbling_window(timedelta(seconds=10)).agg(value=Sum()).final(closing_strategy="partition")

# Details:
# -> Timestamp=100, Key="A", value=1 -> emit nothing (the window is not closed yet)
# -> Timestamp=101, Key="B", value=2 -> emit nothing (the window is not closed yet)
# -> Timestamp=105, Key="C", value=3 -> emit nothing (the window is not closed yet)
# -> Timestamp=10100, Key="B", value=1 -> emit three messages, the time has progressed beyond the window end for all the keys in the partition
# 1. first one with key "A" and value {"start": 0, "end": 10000, "value": 1}
# 2. second one with key "B" and value {"start": 0, "end": 10000, "value": 2}
# 3. third one with key "C" and value {"start": 0, "end": 10000, "value": 3}
# -> Timestamp=8000, Key="A", value=1 -> emit nothing and value isn't part of the sum (the window is already closed)
# -> Timestamp=10001, Key="A", value=1 -> emit nothing (the window is not closed yet)

# Results:
# (key="A", value={"start": 0, "end": 10000, "value": 1})
# (key="B", value={"start": 0, "end": 10000, "value": 2})
# (key="C", value={"start": 0, "end": 10000, "value": 3})
```

## Transforming the result of a windowed aggregation
Windowed aggregations return aggregated results in the following format/schema:

Expand Down
2 changes: 1 addition & 1 deletion quixstreams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@

__all__ = ["Application", "message_context", "MessageContext", "State"]

__version__ = "3.23.1"
__version__ = "4.0.0a2"
Loading