-
Notifications
You must be signed in to change notification settings - Fork 13.7k
[FLINK-38435][table] Refactor codegen and runner for MLPredict and Lo… #27041
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution! Left some comments
...er/src/main/java/org/apache/flink/table/planner/functions/inference/FunctionCallContext.java
Outdated
Show resolved
Hide resolved
...er/src/main/java/org/apache/flink/table/planner/functions/inference/FunctionCallContext.java
Show resolved
Hide resolved
...-planner/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
Outdated
Show resolved
Hide resolved
| callWithDataType | ||
| } | ||
|
|
||
| try { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we move this out of createLookupTypeInference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand. Which part do you mean 0.0
...lanner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCallCodeGenerator.scala
Outdated
Show resolved
Hide resolved
...lanner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCallCodeGenerator.scala
Show resolved
Hide resolved
...lanner/src/main/scala/org/apache/flink/table/planner/codegen/FunctionCallCodeGenerator.scala
Show resolved
Hide resolved
| callContext: FunctionCallContext, | ||
| udf: UserDefinedFunction, | ||
| operands: Seq[GeneratedExpression]) => { | ||
| val inference = TypeInference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not call udf.getTypeInference(dataTypeFactory)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because type inference relies on eval method signature to determine input and output types. However, predict function's eval method's input type is always RowData, which is impossible for type inference to infer its type. So here, we directly according to model's input/output schema to build type inference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ml_predict has type inference defined already? https://github.com/apache/flink/blob/master/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java#L740-L754
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But here udf is not ML_PREDICT, it is an subclass of PredictFunction and flink still uses the method signature determine the type inference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right
...ime/src/main/java/org/apache/flink/table/runtime/operators/join/lookup/LookupJoinRunner.java
Show resolved
Hide resolved
|
|
||
| private void registerMetric(MetricGroup metricGroup) { | ||
| metricGroup.gauge( | ||
| "ai_queue_length", () -> asyncBufferCapacity + 1 - resultFutureBuffer.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resultFutureBuffer size is fixed to be the same as asyncBufferCapacity?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it just means how many requests are in-flight.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean resultFutureBuffer.size() is always equal to asyncBufferCapacity + 1 from open(). So this is alway 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh. resultFutureBuffer size is not fixed. When a record arrives, operator will take a result buffer to handle the execution. CC
Line 74 in 0c0ac19
| JoinedRowResultFuture buffer = resultFutureBuffer.take(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
| callContext: FunctionCallContext, | ||
| udf: UserDefinedFunction, | ||
| operands: Seq[GeneratedExpression]) => { | ||
| val inference = TypeInference |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, right
|
The failed tests is not related to the PR. Merging... |
…okupJoin
What is the purpose of the change
Refactor codegen and runner to reuse utils for MLPredict and Lookup join
Verifying this change
Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.
This change is a trivial rework / code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation