@@ -729,73 +729,6 @@ Also, specifying a grace period using `grace_ms` will increase the latency, beca
729729
730730You can use ` final() ` mode when some latency is allowed, but the emitted results must be complete and unique.
731731
732- ## Closing strategies
733-
734- By default, windows use the ** key** closing strategy.
735- In this strategy, messages advance time and close only windows with the ** same** message key.
736-
737- If some message keys appear irregularly in the stream, the latest windows can remain unprocessed until the message with the same key is received.
738-
739- ``` python
740- from datetime import timedelta
741- from quixstreams import Application
742- from quixstreams.dataframe.windows import Sum
743-
744- app = Application(... )
745- sdf = app.dataframe(... )
746-
747- # Calculate a sum of values over a window of 10 seconds
748- # and use .final() to emit results only when the window is complete
749- sdf = sdf.tumbling_window(timedelta(seconds = 10 )).agg(value = Sum()).final(closing_strategy = " key" )
750-
751- # Details:
752- # -> Timestamp=100, Key="A", value=1 -> emit nothing (the window is not closed yet)
753- # -> Timestamp=101, Key="B", value=2 -> emit nothing (the window is not closed yet)
754- # -> Timestamp=105, Key="C", value=3 -> emit nothing (the window is not closed yet)
755- # -> Timestamp=10100, Key="B", value=2 -> emit one message with key "B" and value {"start": 0, "end": 10000, "value": 2}, the time has progressed beyond the window end for the "B" key only.
756- # -> Timestamp=8000, Key="A", value=1 -> emit nothing (the window is not closed yet)
757- # -> Timestamp=10001, Key="A", value=1 -> emit one message with key "A" and value {"start": 0, "end": 10000, "value": 2}, the time has progressed beyond the window end for the "A" key.
758-
759- # Results:
760- # (key="B", value={"start": 0, "end": 10000, "value": 2})
761- # (key="A", value={"start": 0, "end": 10000, "value": 2})
762- # No message for key "C" as the window is never closed since no messages with key "C" and a timestamp later than 10000 was received
763- ```
764-
765- An alternative is to use the ** partition** closing strategy.
766- In this strategy, messages advance time and close windows for the whole partition to which this key belongs.
767-
768- If messages aren't ordered accross keys some message can be skipped if the windows are already closed.
769-
770- ``` python
771- from datetime import timedelta
772- from quixstreams import Application
773- from quixstreams.dataframe.windows import Sum
774-
775- app = Application(... )
776- sdf = app.dataframe(... )
777-
778- # Calculate a sum of values over a window of 10 seconds
779- # and use .final() to emit results only when the window is complete
780- sdf = sdf.tumbling_window(timedelta(seconds = 10 )).agg(value = Sum()).final(closing_strategy = " partition" )
781-
782- # Details:
783- # -> Timestamp=100, Key="A", value=1 -> emit nothing (the window is not closed yet)
784- # -> Timestamp=101, Key="B", value=2 -> emit nothing (the window is not closed yet)
785- # -> Timestamp=105, Key="C", value=3 -> emit nothing (the window is not closed yet)
786- # -> Timestamp=10100, Key="B", value=1 -> emit three messages, the time has progressed beyond the window end for all the keys in the partition
787- # 1. first one with key "A" and value {"start": 0, "end": 10000, "value": 1}
788- # 2. second one with key "B" and value {"start": 0, "end": 10000, "value": 2}
789- # 3. third one with key "C" and value {"start": 0, "end": 10000, "value": 3}
790- # -> Timestamp=8000, Key="A", value=1 -> emit nothing and value isn't part of the sum (the window is already closed)
791- # -> Timestamp=10001, Key="A", value=1 -> emit nothing (the window is not closed yet)
792-
793- # Results:
794- # (key="A", value={"start": 0, "end": 10000, "value": 1})
795- # (key="B", value={"start": 0, "end": 10000, "value": 2})
796- # (key="C", value={"start": 0, "end": 10000, "value": 3})
797- ```
798-
799732## Transforming the result of a windowed aggregation
800733Windowed aggregations return aggregated results in the following format/schema:
801734
0 commit comments