-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-38424][planner] Support to parse VECTOR_SEARCH function #27039
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
fsk119
wants to merge
2
commits into
apache:master
Choose a base branch
from
fsk119:parse
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+948
−8
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
117 changes: 117 additions & 0 deletions
117
...common/src/main/java/org/apache/flink/table/connector/source/VectorSearchTableSource.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.connector.source; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
import org.apache.flink.configuration.ReadableConfig; | ||
import org.apache.flink.table.connector.source.search.AsyncVectorSearchFunctionProvider; | ||
import org.apache.flink.table.connector.source.search.VectorSearchFunctionProvider; | ||
import org.apache.flink.types.RowKind; | ||
|
||
import java.io.Serializable; | ||
|
||
/** | ||
* A {@link DynamicTableSource} that search rows of an external storage system by one or more | ||
* vectors during runtime. | ||
* | ||
* <p>Compared to {@link ScanTableSource}, the source does not have to read the entire table and can | ||
* lazily fetch individual values from a (possibly continuously changing) external table when | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be interesting to compare it to a LookupTableSource also. |
||
* necessary. | ||
* | ||
* <p>Note: Compared to {@link ScanTableSource}, a {@link VectorSearchTableSource} does only support | ||
* emitting insert-only changes currently (see also {@link RowKind}). Further abilities are not | ||
* supported. | ||
* | ||
* <p>In the last step, the planner will call {@link #getSearchRuntimeProvider(VectorSearchContext)} | ||
* for obtaining a provider of runtime implementation. The search fields that are required to | ||
* perform a search are derived from a query by the planner and will be provided in the given {@link | ||
* VectorSearchTableSource.VectorSearchContext#getSearchColumns()}. The values for those key fields | ||
* are passed during runtime. | ||
*/ | ||
@PublicEvolving | ||
public interface VectorSearchTableSource extends DynamicTableSource { | ||
|
||
/** | ||
* Returns a provider of runtime implementation for reading the data. | ||
* | ||
* <p>There exist different interfaces for runtime implementation which is why {@link | ||
* VectorSearchRuntimeProvider} serves as the base interface. | ||
* | ||
* <p>Independent of the provider interface, a source implementation can work on either | ||
* arbitrary objects or internal data structures (see {@link org.apache.flink.table.data} for | ||
* more information). | ||
* | ||
* <p>The given {@link VectorSearchContext} offers utilities by the planner for creating runtime | ||
* implementation with minimal dependencies to internal data structures. | ||
* | ||
* @see VectorSearchFunctionProvider | ||
* @see AsyncVectorSearchFunctionProvider | ||
*/ | ||
VectorSearchRuntimeProvider getSearchRuntimeProvider(VectorSearchContext context); | ||
|
||
// -------------------------------------------------------------------------------------------- | ||
// Helper interfaces | ||
// -------------------------------------------------------------------------------------------- | ||
|
||
/** | ||
* Context for creating runtime implementation via a {@link VectorSearchRuntimeProvider}. | ||
* | ||
* <p>It offers utilities by the planner for creating runtime implementation with minimal | ||
* dependencies to internal data structures. | ||
* | ||
* <p>Methods should be called in {@link #getSearchRuntimeProvider(VectorSearchContext)}. | ||
* Returned instances that are {@link Serializable} can be directly passed into the runtime | ||
* implementation class. | ||
*/ | ||
@PublicEvolving | ||
interface VectorSearchContext extends DynamicTableSource.Context { | ||
|
||
/** | ||
* Returns an array of key index paths that should be used during the search. The indices | ||
* are 0-based and support composite keys within (possibly nested) structures. | ||
* | ||
* <p>For example, given a table with data type {@code ROW < i INT, s STRING, r ROW < i2 | ||
* INT, s2 STRING > >}, this method would return {@code [[0], [2, 1]]} when {@code i} and | ||
* {@code s2} are used for performing a lookup. | ||
* | ||
* @return array of key index paths | ||
*/ | ||
int[][] getSearchColumns(); | ||
|
||
/** | ||
* Runtime config provided to provider. The config can be used by planner or vector search | ||
* provider at runtime. For example, async options can be used by planner to choose async | ||
* inference. Other config such as http timeout or retry can be used to configure search | ||
* functions. | ||
*/ | ||
ReadableConfig runtimeConfig(); | ||
} | ||
|
||
/** | ||
* Provides actual runtime implementation for reading the data. | ||
* | ||
* <p>There exist different interfaces for runtime implementation which is why {@link | ||
* VectorSearchRuntimeProvider} serves as the base interface. | ||
* | ||
* @see VectorSearchFunctionProvider | ||
* @see AsyncVectorSearchFunctionProvider | ||
*/ | ||
@PublicEvolving | ||
interface VectorSearchRuntimeProvider {} | ||
} |
38 changes: 38 additions & 0 deletions
38
...ava/org/apache/flink/table/connector/source/search/AsyncVectorSearchFunctionProvider.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.connector.source.search; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
import org.apache.flink.table.connector.source.VectorSearchTableSource; | ||
import org.apache.flink.table.functions.AsyncVectorSearchFunction; | ||
|
||
/** A provider for creating {@link AsyncVectorSearchFunction}. */ | ||
@PublicEvolving | ||
public interface AsyncVectorSearchFunctionProvider | ||
extends VectorSearchTableSource.VectorSearchRuntimeProvider { | ||
|
||
/** Helper function for creating a static provider. */ | ||
static AsyncVectorSearchFunctionProvider of( | ||
AsyncVectorSearchFunction asyncVectorSearchFunction) { | ||
return () -> asyncVectorSearchFunction; | ||
} | ||
|
||
/** Creates an {@link AsyncVectorSearchFunction} instance. */ | ||
AsyncVectorSearchFunction createAsyncVectorSearchFunction(); | ||
} |
37 changes: 37 additions & 0 deletions
37
...ain/java/org/apache/flink/table/connector/source/search/VectorSearchFunctionProvider.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.connector.source.search; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
import org.apache.flink.table.connector.source.VectorSearchTableSource; | ||
import org.apache.flink.table.functions.VectorSearchFunction; | ||
|
||
/** A provider for creating {@link VectorSearchFunction}. */ | ||
@PublicEvolving | ||
public interface VectorSearchFunctionProvider | ||
extends VectorSearchTableSource.VectorSearchRuntimeProvider { | ||
|
||
/** Helper function for creating a static provider. */ | ||
static VectorSearchFunctionProvider of(VectorSearchFunction searchFunction) { | ||
return () -> searchFunction; | ||
} | ||
|
||
/** Creates an {@link VectorSearchFunction} instance. */ | ||
VectorSearchFunction createVectorSearchFunction(); | ||
} |
66 changes: 66 additions & 0 deletions
66
...able-common/src/main/java/org/apache/flink/table/functions/AsyncVectorSearchFunction.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.functions; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
import org.apache.flink.table.api.TableException; | ||
import org.apache.flink.table.data.GenericRowData; | ||
import org.apache.flink.table.data.RowData; | ||
|
||
import java.util.Collection; | ||
import java.util.concurrent.CompletableFuture; | ||
|
||
/** | ||
* A wrapper class of {@link AsyncTableFunction} for asynchronous vector search. | ||
* | ||
* <p>The output type of this table function is fixed as {@link RowData}. | ||
*/ | ||
@PublicEvolving | ||
public abstract class AsyncVectorSearchFunction extends AsyncTableFunction<RowData> { | ||
|
||
/** | ||
* Asynchronously search result based on input row to find topK matched rows. | ||
* | ||
* @param topK - The number of topK matched rows to return. | ||
* @param queryData - A {@link RowData} that wraps input for search function. | ||
* @return A collection of all searched results. | ||
*/ | ||
public abstract CompletableFuture<Collection<RowData>> asyncVectorSearch( | ||
int topK, RowData queryData); | ||
|
||
/** Invokes {@link #asyncVectorSearch} and chains futures. */ | ||
public void eval(CompletableFuture<Collection<RowData>> future, Object... args) { | ||
int topK = (int) args[0]; | ||
GenericRowData argsData = GenericRowData.of(args[1]); | ||
asyncVectorSearch(topK, argsData) | ||
.whenComplete( | ||
(result, exception) -> { | ||
if (exception != null) { | ||
future.completeExceptionally( | ||
new TableException( | ||
String.format( | ||
"Failed to execute asynchronously search with input row %s.", | ||
argsData), | ||
exception)); | ||
return; | ||
} | ||
future.complete(result); | ||
}); | ||
} | ||
} |
62 changes: 62 additions & 0 deletions
62
...ink-table-common/src/main/java/org/apache/flink/table/functions/VectorSearchFunction.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.table.functions; | ||
|
||
import org.apache.flink.annotation.PublicEvolving; | ||
import org.apache.flink.table.data.GenericRowData; | ||
import org.apache.flink.table.data.RowData; | ||
import org.apache.flink.util.FlinkRuntimeException; | ||
|
||
import java.io.IOException; | ||
import java.util.Collection; | ||
|
||
/** | ||
* A wrapper class of {@link TableFunction} for synchronous vector search. | ||
* | ||
* <p>The output type of this table function is fixed as {@link RowData}. | ||
*/ | ||
@PublicEvolving | ||
public abstract class VectorSearchFunction extends TableFunction<RowData> { | ||
|
||
/** | ||
* Synchronously search result based on input row to find topK matched rows. | ||
* | ||
* @param topK - The number of topK results to return. | ||
* @param queryData - A {@link RowData} that wraps input for vector search function. | ||
* @return A collection of predicted results. | ||
*/ | ||
public abstract Collection<RowData> vectorSearch(int topK, RowData queryData) | ||
throws IOException; | ||
|
||
/** Invoke {@link #vectorSearch} and handle exceptions. */ | ||
public final void eval(Object... args) { | ||
int topK = (int) args[0]; | ||
GenericRowData argsData = GenericRowData.of(args[1]); | ||
try { | ||
Collection<RowData> results = vectorSearch(topK, argsData); | ||
if (results == null) { | ||
return; | ||
} | ||
results.forEach(this::collect); | ||
} catch (Exception e) { | ||
throw new FlinkRuntimeException( | ||
String.format("Failed to execute search with input row %s.", argsData), e); | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: search -> searches