Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37213][table-runtime] Improve performance of unbounded OVER aggregations #26075

Merged
merged 1 commit into from
Feb 4, 2025

Conversation

pnowojski
Copy link
Contributor

@pnowojski pnowojski commented Jan 24, 2025

Instead of sorting all of the records based on the row time explicilty, use timers to achieve the same thing.

This version vs the previous one register a timer for each record, as opposed to just one timer per key. However since we are using RocksDB for timers, this is a minor problem. In exchange, we:

  • don't have to iterate over all of the state for each timer
  • we are firing timers only when needed, vs for each watermark for each key. For example if watermarks are fire every 200ms and for a given key, we have only one record that should be fired 20s into the future, the previous version would be firing a timer for that key for each watermark unnecessarily without doing any work.

Verifying this change

This is covered by various existing tests, that were parametrised to check for both old and new code paths.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Jan 24, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

.withDescription(
"Which version of the unbounded over aggregation to use: "
+ " 1 - legacy version"
+ " 2 - new version with improved performance");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove the word new so it does not need to be changed if we add a 3rd value at some stage

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this goes into the first version 2 - should we default to 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can default to version 2 indeed and keep this as an opt-out feature.

@davidradl
Copy link
Contributor

CI failing for spotless

@davidradl
Copy link
Contributor

Reviewed by Chi on 23/01/2025 Go back to the submitter with review comments.

…gregations

Instead of sorting all of the records based on the row time
explicilty use timers to achieve the same thing.

This version vs the previous one register a timer for each
record, as opposed to just one timer per key. However since
we are using RocksDB for timers, this is a minor problem.
In exchange, we:
 - don't have to iterate over all of the state for each timer
 - we are firing timers only when needed, vs for each watermark
   for each key. For example if watermarks are fire every 200ms
   and for a given key, we have only one record that should be
   fired 20s into the future, the previous version would be
   firing a timer for that key for each watermark unnecessarily
   without doing any work.
Copy link
Contributor

@StefanRRichter StefanRRichter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@pnowojski
Copy link
Contributor Author

CI is green. Thank you for the reviews.

@davidradl I have addressed your minor comments, so I'm going to merge this. In case you have some further comments please let me know, we can discuss them and address in a separate PR if needed.

@pnowojski pnowojski merged commit 77edff6 into apache:master Feb 4, 2025
@pnowojski pnowojski deleted the f37213 branch February 4, 2025 13:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants