From 2e6b9d075d1fc32542e8e49550d7d7adfd695431 Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Wed, 23 Apr 2025 00:53:23 +0900 Subject: [PATCH 1/5] save --- .../iotdb/udf/table/ExcludeColumnExample.java | 11 +- .../apache/iotdb/udf/table/RepeatExample.java | 17 ++- .../apache/iotdb/udf/table/SplitExample.java | 23 ++- .../relational/MyErrorTableFunction.java | 11 +- .../example/relational/MyExcludeColumn.java | 11 +- .../example/relational/MyRepeatWithIndex.java | 17 ++- .../relational/MyRepeatWithoutIndex.java | 17 ++- .../query/udf/example/relational/MySplit.java | 23 ++- .../relational/EmptyTableFunctionHandle.java | 32 +++++ .../udf/api/relational/TableFunction.java | 10 +- .../table/MapTableFunctionHandle.java | 132 ++++++++++++++++++ .../table/TableFunctionAnalysis.java | 21 ++- .../relational/table/TableFunctionHandle.java | 19 +++ .../plan/planner/TableOperatorGenerator.java | 2 +- .../analyzer/StatementAnalyzer.java | 1 + .../TableFunctionInvocationAnalysis.java | 8 ++ .../relational/planner/RelationPlanner.java | 1 + .../rule/ImplementTableFunctionSource.java | 2 + .../PruneTableFunctionProcessorColumns.java | 1 + ...neTableFunctionProcessorSourceColumns.java | 1 + .../planner/node/TableFunctionNode.java | 35 ++++- .../node/TableFunctionProcessorNode.java | 26 ++++ .../UnaliasSymbolReferences.java | 3 + .../db/queryengine/plan/function/Exclude.java | 11 +- .../db/queryengine/plan/function/Repeat.java | 18 ++- .../db/queryengine/plan/function/Split.java | 23 ++- .../relational/tvf/CapacityTableFunction.java | 22 ++- .../relational/tvf/CumulateTableFunction.java | 24 +++- .../relational/tvf/HOPTableFunction.java | 26 +++- .../relational/tvf/SessionTableFunction.java | 17 ++- .../relational/tvf/TumbleTableFunction.java | 21 ++- .../tvf/VariationTableFunction.java | 17 ++- 32 files changed, 546 insertions(+), 57 deletions(-) create mode 100644 iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java create mode 100644 iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java create mode 100644 iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java index 99f9023299f6..b1cdf810e414 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java +++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/ExcludeColumnExample.java @@ -20,8 +20,10 @@ package org.apache.iotdb.udf.table; import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.EmptyTableFunctionHandle; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -85,11 +87,18 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF return TableFunctionAnalysis.builder() .properColumnSchema(schemaBuilder.build()) .requiredColumns(TBL_PARAM, requiredColumns) + .handle(new EmptyTableFunctionHandle()) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new EmptyTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java index bfae7bcd3836..3837f80e6e42 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java +++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java @@ -23,7 +23,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -81,22 +83,31 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF throw new UDFArgumentNotValidException( "count argument for function repeat() must be positive"); } + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(N_PARAM, count.getValue()); return TableFunctionAnalysis.builder() .properColumnSchema(DescribedSchema.builder().addField("repeat_index", Type.INT32).build()) .requiredColumns( TBL_PARAM, Collections.singletonList(0)) // per spec, function must require at least one column + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { - ScalarArgument count = (ScalarArgument) arguments.get("N"); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new TableFunctionDataProcessor() { - private final int n = (int) count.getValue(); + private final int n = + (int) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(N_PARAM); private long recordIndex = 0; @Override diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java index 976ed970b2fd..47f1faec68cc 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java +++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java @@ -21,7 +21,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -73,17 +75,30 @@ public List getArgumentsSpecifications() { @Override public TableFunctionAnalysis analyze(Map arguments) throws UDFException { DescribedSchema schema = DescribedSchema.builder().addField("output", Type.STRING).build(); - return TableFunctionAnalysis.builder().properColumnSchema(schema).build(); + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + INPUT_PARAMETER_NAME, ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()); + handle.addProperty( + SPLIT_PARAMETER_NAME, ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + return TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionLeafProcessor getSplitProcessor() { return new SplitProcessor( - (String) ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue(), - (String) ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(INPUT_PARAMETER_NAME), + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SPLIT_PARAMETER_NAME)); } }; } diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java index 177892e4e03f..17d94f54a13a 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java @@ -22,7 +22,9 @@ import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -95,13 +97,20 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF .properColumnSchema( DescribedSchema.builder().addField("proper_column", Type.INT32).build()) .requiredColumns("TIMECHO", Collections.singletonList(1)) + .handle(new MapTableFunctionHandle()) .build(); } throw new UDFArgumentNotValidException("unexpected argument value"); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java index 35e35e085c89..49cc5a490d88 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyExcludeColumn.java @@ -21,7 +21,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -66,11 +68,18 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF return TableFunctionAnalysis.builder() .properColumnSchema(schemaBuilder.build()) .requiredColumns(TBL_PARAM, requiredColumns) + .handle(new MapTableFunctionHandle()) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java index 31cc06b44d02..62df7a7ea0db 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java @@ -23,7 +23,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -66,22 +68,31 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF throw new UDFArgumentNotValidException( "count argument for function repeat() must be positive"); } + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(N_PARAM, count.getValue()); return TableFunctionAnalysis.builder() .properColumnSchema(DescribedSchema.builder().addField("repeat_index", Type.INT32).build()) .requiredColumns( TBL_PARAM, Collections.singletonList(0)) // per spec, function must require at least one column + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { - ScalarArgument count = (ScalarArgument) arguments.get("N"); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new TableFunctionDataProcessor() { - private final int n = (int) count.getValue(); + private final int n = + (int) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(N_PARAM); private long recordIndex = 0; @Override diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java index 3b49c267c5c5..b60dda005091 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java @@ -23,7 +23,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; @@ -65,21 +67,30 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF throw new UDFArgumentNotValidException( "count argument for function repeat() must be positive"); } + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(N_PARAM, count.getValue()); return TableFunctionAnalysis.builder() .requiredColumns( TBL_PARAM, Collections.singletonList(0)) // per spec, function must require at least one column + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { - ScalarArgument count = (ScalarArgument) arguments.get("N"); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new TableFunctionDataProcessor() { - private final int n = (int) count.getValue(); + private final int n = + (int) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(N_PARAM); private long recordIndex = 0; @Override diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java index d9a395eaa25e..f243c259d925 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java @@ -21,7 +21,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -57,17 +59,30 @@ public List getArgumentsSpecifications() { @Override public TableFunctionAnalysis analyze(Map arguments) throws UDFException { DescribedSchema schema = DescribedSchema.builder().addField("output", Type.STRING).build(); - return TableFunctionAnalysis.builder().properColumnSchema(schema).build(); + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + INPUT_PARAMETER_NAME, ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()); + handle.addProperty( + SPLIT_PARAMETER_NAME, ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + return TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionLeafProcessor getSplitProcessor() { return new SplitProcessor( - (String) ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue(), - (String) ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(INPUT_PARAMETER_NAME), + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SPLIT_PARAMETER_NAME)); } }; } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java new file mode 100644 index 000000000000..8fed0b6acd12 --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api.relational; + +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; + +public class EmptyTableFunctionHandle implements TableFunctionHandle { + @Override + public byte[] serialize() { + return new byte[0]; + } + + @Override + public void deserialize(byte[] bytes) {} +} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java index 8c81840a4f83..a79c4336708c 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/TableFunction.java @@ -21,6 +21,7 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; @@ -81,6 +82,8 @@ public interface TableFunction extends SQLFunction { *
  • A description of proper columns. *
  • A map indicating which columns from the input table arguments are required for the * function to execute. + *
  • A TableFunctionExecutionInfo which stores all information necessary to execute the + * table function. * * * @@ -91,13 +94,16 @@ public interface TableFunction extends SQLFunction { */ TableFunctionAnalysis analyze(Map arguments) throws UDFException; + TableFunctionHandle createTableFunctionHandle(); + /** * This method is used to obtain a {@link TableFunctionProcessorProvider} that will be responsible * for creating processors to handle the transformation of input data into output table. The * provider is initialized with the validated arguments. * - * @param arguments a map of argument names to their corresponding {@link Argument} values + * @param tableFunctionHandle the object containing the execution information, which is generated + * in the {@link TableFunction#analyze} process. * @return a {@link TableFunctionProcessorProvider} for creating processors */ - TableFunctionProcessorProvider getProcessorProvider(Map arguments); + TableFunctionProcessorProvider getProcessorProvider(TableFunctionHandle tableFunctionHandle); } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java new file mode 100644 index 000000000000..d4946e5e2c86 --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.udf.api.relational.table; + +import org.apache.iotdb.udf.api.type.Type; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +public class MapTableFunctionHandle implements TableFunctionHandle { + private static final Set> SUPPORT_VALUE_TYPE = + new HashSet<>( + Arrays.asList( + Integer.class, Long.class, Double.class, Float.class, String.class, Double.class)); + private final Map map = new HashMap<>(); + + public void addProperty(String key, Object value) { + if (!SUPPORT_VALUE_TYPE.contains(value.getClass())) { + throw new IllegalArgumentException("Unsupported value type."); + } + map.put(key, value); + } + + public Object getProperty(String key) { + return map.get(key); + } + + @Override + public byte[] serialize() { + ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + buffer.putInt(map.size()); + for (Map.Entry entry : map.entrySet()) { + byte[] bytes = entry.getKey().getBytes(StandardCharsets.UTF_8); + buffer.putInt(bytes.length); + buffer.put(bytes); + if (entry.getValue() instanceof Long) { + buffer.put(Type.INT64.getType()); + buffer.putLong((Long) entry.getValue()); + } else if (entry.getValue() instanceof Integer) { + buffer.put(Type.INT32.getType()); + buffer.putInt((Integer) entry.getValue()); + } else if (entry.getValue() instanceof Double) { + buffer.put(Type.DOUBLE.getType()); + buffer.putDouble((Double) entry.getValue()); + } else if (entry.getValue() instanceof Float) { + buffer.put(Type.FLOAT.getType()); + buffer.putFloat((Float) entry.getValue()); + } else if (entry.getValue() instanceof Boolean) { + buffer.put(Type.BOOLEAN.getType()); + buffer.put(Boolean.TRUE.equals(entry.getValue()) ? (byte) 1 : (byte) 0); + } else if (entry.getValue() instanceof String) { + buffer.put(Type.STRING.getType()); + bytes = ((String) entry.getValue()).getBytes(StandardCharsets.UTF_8); + buffer.putInt(bytes.length); + buffer.put(bytes); + } + } + return buffer.array(); + } + + @Override + public void deserialize(byte[] bytes) { + ByteBuffer buffer = ByteBuffer.wrap(bytes); + int size = buffer.getInt(); + for (int i = 0; i < size; i++) { + byte[] b = new byte[buffer.getInt()]; + buffer.get(b); + String key = new String(b, StandardCharsets.UTF_8); + Type type = Type.valueOf(buffer.get()); + switch (type) { + case BOOLEAN: + map.put(key, buffer.get() != 0); + break; + case INT32: + map.put(key, buffer.getInt()); + break; + case INT64: + map.put(key, buffer.getLong()); + break; + case FLOAT: + map.put(key, buffer.getFloat()); + break; + case DOUBLE: + map.put(key, buffer.getDouble()); + break; + case STRING: + b = new byte[buffer.getInt()]; + buffer.get(b); + map.put(key, new String(b, StandardCharsets.UTF_8)); + break; + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("MapTableFunctionHandle{"); + for (Map.Entry entry : map.entrySet()) { + sb.append(entry.getKey()).append("=").append(entry.getValue()).append(", "); + } + if (sb.length() > 2) { + sb.setLength(sb.length() - 2); // remove last comma and space + } + sb.append('}'); + return sb.toString(); + } +} diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java index bff404c1623e..630fa814fca0 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionAnalysis.java @@ -58,13 +58,17 @@ public class TableFunctionAnalysis { */ private final boolean requireRecordSnapshot; + private final TableFunctionHandle handle; + private TableFunctionAnalysis( Optional properColumnSchema, Map> requiredColumns, - boolean requiredRecordSnapshot) { + boolean requiredRecordSnapshot, + TableFunctionHandle handle) { this.properColumnSchema = requireNonNull(properColumnSchema, "returnedType is null"); this.requiredColumns = requiredColumns; this.requireRecordSnapshot = requiredRecordSnapshot; + this.handle = requireNonNull(handle, "TableFunctionHandle is null"); } public Optional getProperColumnSchema() { @@ -79,6 +83,10 @@ public boolean isRequireRecordSnapshot() { return requireRecordSnapshot; } + public TableFunctionHandle getTableFunctionHandle() { + return handle; + } + public static Builder builder() { return new Builder(); } @@ -87,6 +95,7 @@ public static final class Builder { private DescribedSchema properColumnSchema; private final Map> requiredColumns = new HashMap<>(); private boolean requireRecordSnapshot = true; + private TableFunctionHandle executionInfo; private Builder() {} @@ -105,9 +114,17 @@ public Builder requireRecordSnapshot(boolean requireRecordSnapshot) { return this; } + public Builder handle(TableFunctionHandle executionInfo) { + this.executionInfo = executionInfo; + return this; + } + public TableFunctionAnalysis build() { return new TableFunctionAnalysis( - Optional.ofNullable(properColumnSchema), requiredColumns, requireRecordSnapshot); + Optional.ofNullable(properColumnSchema), + requiredColumns, + requireRecordSnapshot, + executionInfo); } } } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java new file mode 100644 index 000000000000..61b4f1fbf1df --- /dev/null +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java @@ -0,0 +1,19 @@ +package org.apache.iotdb.udf.api.relational.table; + +/** + * An area to store all information necessary to execute the table function, gathered at analysis + * time + */ +public interface TableFunctionHandle { + /** + * Serialize your state into byte array. The order of serialization must be consistent with + * deserialization. + */ + byte[] serialize(); + + /** + * Deserialize byte array into your state. The order of deserialization must be consistent with + * serialization. + */ + void deserialize(byte[] bytes); +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index ee6a13a006c3..3bc8f5a7e884 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -2862,7 +2862,7 @@ public Operator visitTableFunctionProcessor( TableFunctionProcessorNode node, LocalExecutionPlanContext context) { TableFunction tableFunction = metadata.getTableFunction(node.getName()); TableFunctionProcessorProvider processorProvider = - tableFunction.getProcessorProvider(node.getArguments()); + tableFunction.getProcessorProvider(node.getTableFunctionHandle()); if (node.getChildren().isEmpty()) { List outputDataTypes = node.getOutputSymbols().stream() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java index ec8279853bdd..d248448d018a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java @@ -4140,6 +4140,7 @@ public Scope visitTableFunctionInvocation(TableFunctionInvocation node, Optional new TableFunctionInvocationAnalysis( node.getName().toString(), argumentsAnalysis.getPassedArguments(), + functionAnalysis.getTableFunctionHandle(), orderedTableArguments.build(), requiredColumns, properSchema.map(describedSchema -> describedSchema.getFields().size()).orElse(0), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java index 73900573fa97..1740ac2120a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/tablefunction/TableFunctionInvocationAnalysis.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.plan.relational.analyzer.tablefunction; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import com.google.common.collect.ImmutableList; @@ -29,6 +30,7 @@ public class TableFunctionInvocationAnalysis { private final String functionName; private final Map passedArguments; + private final TableFunctionHandle tableFunctionHandle; private final List tableArgumentAnalyses; private final Map> requiredColumns; private final int properColumnsCount; @@ -37,12 +39,14 @@ public class TableFunctionInvocationAnalysis { public TableFunctionInvocationAnalysis( String name, Map passedArguments, + TableFunctionHandle tableFunctionHandle, ImmutableList tableArgumentAnalyses, Map> requiredColumns, int properColumnsCount, boolean requiredRecordSnapshot) { this.functionName = name; this.passedArguments = passedArguments; + this.tableFunctionHandle = tableFunctionHandle; this.tableArgumentAnalyses = tableArgumentAnalyses; this.requiredColumns = requiredColumns; this.properColumnsCount = properColumnsCount; @@ -65,6 +69,10 @@ public Map getPassedArguments() { return passedArguments; } + public TableFunctionHandle getTableFunctionHandle() { + return tableFunctionHandle; + } + public int getProperColumnsCount() { return properColumnsCount; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index 4d14ebaf06bf..a5f2a79a3fe7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -919,6 +919,7 @@ public RelationPlan visitTableFunctionInvocation(TableFunctionInvocation node, V idAllocator.genPlanNodeId(), functionAnalysis.getFunctionName(), functionAnalysis.getPassedArguments(), + functionAnalysis.getTableFunctionHandle(), properOutputs, sources.build(), sourceProperties.build()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java index e1079d9d5b31..f40e7c80e7de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java @@ -74,6 +74,7 @@ public Result apply(TableFunctionNode node, Captures captures, Context context) Optional.empty(), false, node.getArguments(), + node.getTableFunctionHandle(), false)); } else if (node.getChildren().size() == 1) { // Single source does not require pre-processing. @@ -126,6 +127,7 @@ public Result apply(TableFunctionNode node, Captures captures, Context context) sourceProperties.getDataOrganizationSpecification(), sourceProperties.isRowSemantics(), node.getArguments(), + node.getTableFunctionHandle(), sourceProperties.isRequireRecordSnapshot())); } else { // we don't support multiple source now. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java index 60792dad57d1..3d65469f02c6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java @@ -83,6 +83,7 @@ protected Optional pushDownProjectOff( node.getDataOrganizationSpecification(), node.isRowSemantic(), node.getArguments(), + node.getTableFunctionHandle(), node.isRequireRecordSnapshot())); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java index 6de6efb55fb2..a1d55cf492c7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java @@ -92,6 +92,7 @@ public Result apply(TableFunctionProcessorNode node, Captures captures, Context node.getDataOrganizationSpecification(), node.isRowSemantic(), node.getArguments(), + node.getTableFunctionHandle(), node.isRequireRecordSnapshot()))) .orElse(Result.empty()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java index e1326d97a9cc..65034f012809 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java @@ -24,8 +24,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl; import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import com.google.common.collect.ImmutableList; @@ -48,6 +50,7 @@ public class TableFunctionNode extends MultiChildProcessNode { private final String name; private final Map arguments; + private final TableFunctionHandle tableFunctionHandle; private final List properOutputs; private final List tableArgumentProperties; @@ -55,12 +58,14 @@ public TableFunctionNode( PlanNodeId id, String name, Map arguments, + TableFunctionHandle tableFunctionHandle, List properOutputs, List children, List tableArgumentProperties) { super(id, children); this.name = requireNonNull(name, "name is null"); this.arguments = ImmutableMap.copyOf(arguments); + this.tableFunctionHandle = tableFunctionHandle; this.properOutputs = ImmutableList.copyOf(properOutputs); this.tableArgumentProperties = ImmutableList.copyOf(tableArgumentProperties); } @@ -69,11 +74,13 @@ public TableFunctionNode( PlanNodeId id, String name, Map arguments, + TableFunctionHandle tableFunctionHandle, List properOutputs, List tableArgumentProperties) { super(id); this.name = requireNonNull(name, "name is null"); this.arguments = ImmutableMap.copyOf(arguments); + this.tableFunctionHandle = tableFunctionHandle; this.properOutputs = ImmutableList.copyOf(properOutputs); this.tableArgumentProperties = ImmutableList.copyOf(tableArgumentProperties); } @@ -86,6 +93,10 @@ public Map getArguments() { return arguments; } + public TableFunctionHandle getTableFunctionHandle() { + return tableFunctionHandle; + } + public List getProperOutputs() { return properOutputs; } @@ -96,7 +107,8 @@ public List getTableArgumentProperties() { @Override public PlanNode clone() { - return new TableFunctionNode(id, name, arguments, properOutputs, tableArgumentProperties); + return new TableFunctionNode( + id, name, arguments, tableFunctionHandle, properOutputs, tableArgumentProperties); } @Override @@ -130,7 +142,13 @@ public List getOutputColumnNames() { public PlanNode replaceChildren(List newSources) { checkArgument(children.size() == newSources.size(), "wrong number of new children"); return new TableFunctionNode( - getPlanNodeId(), name, arguments, properOutputs, newSources, tableArgumentProperties); + getPlanNodeId(), + name, + arguments, + tableFunctionHandle, + properOutputs, + newSources, + tableArgumentProperties); } @Override @@ -142,6 +160,9 @@ protected void serializeAttributes(ByteBuffer byteBuffer) { ReadWriteIOUtils.write(entry.getKey(), byteBuffer); entry.getValue().serialize(byteBuffer); } + byte[] bytes = tableFunctionHandle.serialize(); + ReadWriteIOUtils.write(bytes.length, byteBuffer); + ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), byteBuffer); ReadWriteIOUtils.write(properOutputs.size(), byteBuffer); properOutputs.forEach(symbol -> Symbol.serialize(symbol, byteBuffer)); ReadWriteIOUtils.write(tableArgumentProperties.size(), byteBuffer); @@ -170,6 +191,9 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException { ReadWriteIOUtils.write(entry.getKey(), stream); entry.getValue().serialize(stream); } + byte[] bytes = tableFunctionHandle.serialize(); + ReadWriteIOUtils.write(bytes.length, stream); + ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), stream); ReadWriteIOUtils.write(properOutputs.size(), stream); for (Symbol symbol : properOutputs) { Symbol.serialize(symbol, stream); @@ -201,6 +225,12 @@ public static TableFunctionNode deserialize(ByteBuffer byteBuffer) { arguments.put(key, value); } size = ReadWriteIOUtils.readInt(byteBuffer); + byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, size); + TableFunctionHandle tableFunctionHandle = + new TableMetadataImpl().getTableFunction(name).createTableFunctionHandle(); + tableFunctionHandle.deserialize(bytes); + // TODO: how to get table function handle instance more elegantly? + size = ReadWriteIOUtils.readInt(byteBuffer); ImmutableList.Builder properOutputs = ImmutableList.builder(); for (int i = 0; i < size; i++) { properOutputs.add(Symbol.deserialize(byteBuffer)); @@ -238,6 +268,7 @@ public static TableFunctionNode deserialize(ByteBuffer byteBuffer) { planNodeId, name, arguments.build(), + tableFunctionHandle, properOutputs.build(), tableArgumentProperties.build()); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java index 57184818de75..260578f5f70c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java @@ -24,8 +24,10 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.SingleChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl; import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import com.google.common.collect.ImmutableList; @@ -65,6 +67,8 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { private final Map arguments; + private final TableFunctionHandle tableFunctionHandle; + private final boolean requireRecordSnapshot; public TableFunctionProcessorNode( @@ -77,6 +81,7 @@ public TableFunctionProcessorNode( Optional dataOrganizationSpecification, boolean rowSemantic, Map arguments, + TableFunctionHandle tableFunctionHandle, boolean requireRecordSnapshot) { super(id, source.orElse(null)); this.name = requireNonNull(name, "name is null"); @@ -87,6 +92,7 @@ public TableFunctionProcessorNode( requireNonNull(dataOrganizationSpecification, "specification is null"); this.rowSemantic = rowSemantic; this.arguments = ImmutableMap.copyOf(arguments); + this.tableFunctionHandle = tableFunctionHandle; this.requireRecordSnapshot = requireRecordSnapshot; } @@ -99,6 +105,7 @@ public TableFunctionProcessorNode( Optional dataOrganizationSpecification, boolean rowSemantic, Map arguments, + TableFunctionHandle tableFunctionHandle, boolean requireRecordSnapshot) { super(id); this.name = requireNonNull(name, "name is null"); @@ -109,6 +116,7 @@ public TableFunctionProcessorNode( requireNonNull(dataOrganizationSpecification, "specification is null"); this.rowSemantic = rowSemantic; this.arguments = ImmutableMap.copyOf(arguments); + this.tableFunctionHandle = tableFunctionHandle; this.requireRecordSnapshot = requireRecordSnapshot; } @@ -140,6 +148,10 @@ public Map getArguments() { return arguments; } + public TableFunctionHandle getTableFunctionHandle() { + return tableFunctionHandle; + } + public boolean isRequireRecordSnapshot() { return requireRecordSnapshot; } @@ -155,6 +167,7 @@ public PlanNode clone() { dataOrganizationSpecification, rowSemantic, arguments, + tableFunctionHandle, requireRecordSnapshot); } @@ -210,6 +223,9 @@ protected void serializeAttributes(ByteBuffer byteBuffer) { ReadWriteIOUtils.write(key, byteBuffer); value.serialize(byteBuffer); }); + byte[] bytes = tableFunctionHandle.serialize(); + ReadWriteIOUtils.write(bytes.length, byteBuffer); + ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), byteBuffer); ReadWriteIOUtils.write(requireRecordSnapshot, byteBuffer); } @@ -239,6 +255,9 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException { ReadWriteIOUtils.write(entry.getKey(), stream); entry.getValue().serialize(stream); } + byte[] bytes = tableFunctionHandle.serialize(); + ReadWriteIOUtils.write(bytes.length, stream); + ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), stream); ReadWriteIOUtils.write(requireRecordSnapshot, stream); } @@ -272,6 +291,11 @@ public static TableFunctionProcessorNode deserialize(ByteBuffer byteBuffer) { Argument value = Argument.deserialize(byteBuffer); arguments.put(key, value); } + byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, size); + TableFunctionHandle tableFunctionHandle = + new TableMetadataImpl().getTableFunction(name).createTableFunctionHandle(); + tableFunctionHandle.deserialize(bytes); + // TODO: how to get table function handle instance more elegantly? boolean requireRecordSnapshot = ReadWriteIOUtils.readBoolean(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); @@ -284,6 +308,7 @@ public static TableFunctionProcessorNode deserialize(ByteBuffer byteBuffer) { dataOrganizationSpecification, rowSemantic, arguments, + tableFunctionHandle, requireRecordSnapshot); } @@ -301,6 +326,7 @@ public PlanNode replaceChildren(List newSources) { dataOrganizationSpecification, rowSemantic, arguments, + tableFunctionHandle, requireRecordSnapshot); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index d12db4d30340..1f13f5b893a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -871,6 +871,7 @@ public PlanAndMappings visitTableFunction(TableFunctionNode node, UnaliasContext node.getPlanNodeId(), node.getName(), node.getArguments(), + node.getTableFunctionHandle(), newProperOutputs, newSources.build(), newTableArgumentProperties.build()), @@ -894,6 +895,7 @@ public PlanAndMappings visitTableFunctionProcessor( Optional.empty(), node.isRowSemantic(), node.getArguments(), + node.getTableFunctionHandle(), node.isRequireRecordSnapshot()), mapping); } @@ -931,6 +933,7 @@ public PlanAndMappings visitTableFunctionProcessor( newSpecification, node.isRowSemantic(), node.getArguments(), + node.getTableFunctionHandle(), node.isRequireRecordSnapshot()); return new PlanAndMappings(rewrittenTableFunctionProcessor, mapping); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java index 8fd86f684bf5..bc5289da14ab 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Exclude.java @@ -20,8 +20,10 @@ package org.apache.iotdb.db.queryengine.plan.function; import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.EmptyTableFunctionHandle; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -66,11 +68,18 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF return TableFunctionAnalysis.builder() .properColumnSchema(schemaBuilder.build()) .requiredColumns(TBL_PARAM, requiredColumns) + .handle(new EmptyTableFunctionHandle()) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new EmptyTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java index 0682fd83d53f..766fdffcd3de 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java @@ -23,7 +23,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -62,22 +64,32 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF throw new UDFArgumentNotValidException( "count argument for function repeat() must be positive"); } + + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(N_PARAM, count.getValue()); return TableFunctionAnalysis.builder() .properColumnSchema(DescribedSchema.builder().addField("repeat_index", Type.INT32).build()) .requiredColumns( TBL_PARAM, Collections.singletonList(0)) // per spec, function must require at least one column + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { - ScalarArgument count = (ScalarArgument) arguments.get("N"); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new TableFunctionDataProcessor() { - private final int n = (int) count.getValue(); + private final int n = + (int) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(N_PARAM); private long recordIndex = 0; @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java index fede9cae8a7a..ef86ccb30284 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java @@ -21,7 +21,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -57,17 +59,30 @@ public List getArgumentsSpecifications() { @Override public TableFunctionAnalysis analyze(Map arguments) throws UDFException { DescribedSchema schema = DescribedSchema.builder().addField("output", Type.STRING).build(); - return TableFunctionAnalysis.builder().properColumnSchema(schema).build(); + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + INPUT_PARAMETER_NAME, ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()); + handle.addProperty( + SPLIT_PARAMETER_NAME, ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + return TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionLeafProcessor getSplitProcessor() { return new SplitProcessor( - (String) ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue(), - (String) ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(INPUT_PARAMETER_NAME), + (String) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SPLIT_PARAMETER_NAME)); } }; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java index f5d84c37ada8..630273efbb07 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java @@ -22,7 +22,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -60,33 +62,43 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF if (size <= 0) { throw new UDFException("Size must be greater than 0"); } + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(SIZE_PARAMETER_NAME, size); return TableFunctionAnalysis.builder() .properColumnSchema( new DescribedSchema.Builder().addField("window_index", Type.INT64).build()) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(0)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { - long sz = (long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue(); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + long sz = + (long) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { - return new CountDataProcessor(sz); + return new CapacityDataProcessor(sz); } }; } - private static class CountDataProcessor implements TableFunctionDataProcessor { + private static class CapacityDataProcessor implements TableFunctionDataProcessor { private final long size; private long currentStartIndex = 0; private long curIndex = 0; private long windowIndex = 0; - public CountDataProcessor(long size) { + public CapacityDataProcessor(long size) { this.size = size; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java index acb3e588f057..b37ea8450d59 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java @@ -22,7 +22,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -94,24 +96,36 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF .addField("window_start", Type.TIMESTAMP) .addField("window_end", Type.TIMESTAMP) .build(); - + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty(STEP_PARAMETER_NAME, step); + handle.addProperty(SIZE_PARAMETER_NAME, size); + handle.addProperty( + ORIGIN_PARAMETER_NAME, ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new CumulateDataProcessor( - (Long) ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(STEP_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + (Long) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(ORIGIN_PARAMETER_NAME), + (Long) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(STEP_PARAMETER_NAME), + (Long) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME)); } }; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java index b2178c5805a2..382fd349dbad 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java @@ -21,7 +21,9 @@ import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -84,24 +86,38 @@ public TableFunctionAnalysis analyze(Map arguments) { .addField("window_start", Type.TIMESTAMP) .addField("window_end", Type.TIMESTAMP) .build(); - + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + ORIGIN_PARAMETER_NAME, ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()); + handle.addProperty( + SLIDE_PARAMETER_NAME, ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue()); + handle.addProperty( + SIZE_PARAMETER_NAME, ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + MapTableFunctionHandle mapTableFunctionHandle = (MapTableFunctionHandle) tableFunctionHandle; return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new HOPDataProcessor( - (Long) ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + (Long) mapTableFunctionHandle.getProperty(ORIGIN_PARAMETER_NAME), + (Long) mapTableFunctionHandle.getProperty(SLIDE_PARAMETER_NAME), + (Long) mapTableFunctionHandle.getProperty(SIZE_PARAMETER_NAME)); } }; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java index 9a7ec4577abd..4133d36c8da0 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java @@ -22,7 +22,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -75,17 +77,28 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF .addField("window_end", Type.TIMESTAMP) .build(); + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + GAP_PARAMETER_NAME, ((ScalarArgument) arguments.get(GAP_PARAMETER_NAME)).getValue()); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { - long gap = (long) ((ScalarArgument) arguments.get(GAP_PARAMETER_NAME)).getValue(); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + long gap = + (long) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(GAP_PARAMETER_NAME); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java index a239c694129f..161e2b040c24 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java @@ -22,7 +22,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -83,22 +85,35 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF .addField("window_end", Type.TIMESTAMP) .build(); + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + ORIGIN_PARAMETER_NAME, ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()); + handle.addProperty( + SIZE_PARAMETER_NAME, ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { return new TumbleDataProcessor( - (Long) ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue(), - (Long) ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + (Long) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(ORIGIN_PARAMETER_NAME), + (Long) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(SIZE_PARAMETER_NAME)); } }; } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java index 189ef0bc469f..6faf47289b82 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java @@ -22,7 +22,9 @@ import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -76,16 +78,27 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF DescribedSchema properColumnSchema = new DescribedSchema.Builder().addField("window_index", Type.INT64).build(); // outputColumnSchema + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.addProperty( + DELTA_PARAMETER_NAME, ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue()); return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) .requireRecordSnapshot(false) .requiredColumns(DATA_PARAMETER_NAME, Collections.singletonList(requiredIndex)) + .handle(handle) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { - double delta = (double) ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue(); + public TableFunctionHandle createTableFunctionHandle() { + return new MapTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { + double delta = + (double) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(DELTA_PARAMETER_NAME); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { From 4d9a2df542645f0e429ce03847775ad0ba6d8cd9 Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Wed, 23 Apr 2025 00:57:30 +0900 Subject: [PATCH 2/5] Add license --- .../relational/table/TableFunctionHandle.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java index 61b4f1fbf1df..86f85688f2a7 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/TableFunctionHandle.java @@ -1,3 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.iotdb.udf.api.relational.table; /** From 21be739005136d024cca1df5434df9ce8bc88243 Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Wed, 23 Apr 2025 23:15:04 +0900 Subject: [PATCH 3/5] remove arguments --- .../apache/iotdb/udf/table/RepeatExample.java | 4 +- .../apache/iotdb/udf/table/SplitExample.java | 14 +- .../relational/MyErrorTableFunction.java | 4 + .../example/relational/MyRepeatWithIndex.java | 4 +- .../relational/MyRepeatWithoutIndex.java | 4 +- .../query/udf/example/relational/MySplit.java | 14 +- .../relational/EmptyTableFunctionHandle.java | 5 + .../table/MapTableFunctionHandle.java | 32 ++++ .../planner/plan/node/PlanGraphPrinter.java | 30 +-- .../relational/planner/RelationPlanner.java | 1 - .../rule/ImplementTableFunctionSource.java | 2 - .../PruneTableFunctionProcessorColumns.java | 1 - ...neTableFunctionProcessorSourceColumns.java | 1 - .../planner/node/TableFunctionNode.java | 34 +--- .../node/TableFunctionProcessorNode.java | 35 ---- .../UnaliasSymbolReferences.java | 3 - .../db/queryengine/plan/function/Repeat.java | 4 +- .../db/queryengine/plan/function/Split.java | 14 +- .../analyzer/TableFunctionTest.java | 75 ++++---- .../TableFunctionProcessorMatcher.java | 178 ++---------------- .../relational/tvf/CapacityTableFunction.java | 4 +- .../relational/tvf/CumulateTableFunction.java | 13 +- .../relational/tvf/HOPTableFunction.java | 19 +- .../relational/tvf/SessionTableFunction.java | 8 +- .../relational/tvf/TumbleTableFunction.java | 14 +- .../tvf/VariationTableFunction.java | 9 +- 26 files changed, 163 insertions(+), 363 deletions(-) diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java index 3837f80e6e42..f042716a4917 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java +++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/RepeatExample.java @@ -83,8 +83,8 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF throw new UDFArgumentNotValidException( "count argument for function repeat() must be positive"); } - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty(N_PARAM, count.getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder().addProperty(N_PARAM, count.getValue()).build(); return TableFunctionAnalysis.builder() .properColumnSchema(DescribedSchema.builder().addField("repeat_index", Type.INT32).build()) .requiredColumns( diff --git a/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java b/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java index 47f1faec68cc..03c46ad4a086 100644 --- a/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java +++ b/example/udf/src/main/java/org/apache/iotdb/udf/table/SplitExample.java @@ -75,11 +75,15 @@ public List getArgumentsSpecifications() { @Override public TableFunctionAnalysis analyze(Map arguments) throws UDFException { DescribedSchema schema = DescribedSchema.builder().addField("output", Type.STRING).build(); - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty( - INPUT_PARAMETER_NAME, ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()); - handle.addProperty( - SPLIT_PARAMETER_NAME, ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder() + .addProperty( + INPUT_PARAMETER_NAME, + ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()) + .addProperty( + SPLIT_PARAMETER_NAME, + ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()) + .build(); return TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build(); } diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java index 17d94f54a13a..1dd44653ecb4 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyErrorTableFunction.java @@ -69,6 +69,7 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF return TableFunctionAnalysis.builder() .properColumnSchema( DescribedSchema.builder().addField("proper_column", Type.INT32).build()) + .handle(new MapTableFunctionHandle()) .build(); } else if (nValue == 1) { // set empty required columns @@ -76,6 +77,7 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF .properColumnSchema( DescribedSchema.builder().addField("proper_column", Type.INT32).build()) .requiredColumns(TBL_PARAM, Collections.emptyList()) + .handle(new MapTableFunctionHandle()) .build(); } else if (nValue == 2) { // set negative required columns @@ -83,6 +85,7 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF .properColumnSchema( DescribedSchema.builder().addField("proper_column", Type.INT32).build()) .requiredColumns(TBL_PARAM, Collections.singletonList(-1)) + .handle(new MapTableFunctionHandle()) .build(); } else if (nValue == 3) { // set required columns out of bound (0~10) @@ -90,6 +93,7 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF .properColumnSchema( DescribedSchema.builder().addField("proper_column", Type.INT32).build()) .requiredColumns(TBL_PARAM, IntStream.range(0, 11).boxed().collect(Collectors.toList())) + .handle(new MapTableFunctionHandle()) .build(); } else if (nValue == 4) { // specify required columns to unknown table diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java index 62df7a7ea0db..8fe032bfb5e9 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithIndex.java @@ -68,8 +68,8 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF throw new UDFArgumentNotValidException( "count argument for function repeat() must be positive"); } - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty(N_PARAM, count.getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder().addProperty(N_PARAM, count.getValue()).build(); return TableFunctionAnalysis.builder() .properColumnSchema(DescribedSchema.builder().addField("repeat_index", Type.INT32).build()) .requiredColumns( diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java index b60dda005091..42b2e331dd32 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MyRepeatWithoutIndex.java @@ -67,8 +67,8 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF throw new UDFArgumentNotValidException( "count argument for function repeat() must be positive"); } - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty(N_PARAM, count.getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder().addProperty(N_PARAM, count.getValue()).build(); return TableFunctionAnalysis.builder() .requiredColumns( TBL_PARAM, diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java index f243c259d925..6039b8df75e0 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySplit.java @@ -59,11 +59,15 @@ public List getArgumentsSpecifications() { @Override public TableFunctionAnalysis analyze(Map arguments) throws UDFException { DescribedSchema schema = DescribedSchema.builder().addField("output", Type.STRING).build(); - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty( - INPUT_PARAMETER_NAME, ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()); - handle.addProperty( - SPLIT_PARAMETER_NAME, ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder() + .addProperty( + INPUT_PARAMETER_NAME, + ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()) + .addProperty( + SPLIT_PARAMETER_NAME, + ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()) + .build(); return TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build(); } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java index 8fed0b6acd12..8cba5ba38579 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/EmptyTableFunctionHandle.java @@ -29,4 +29,9 @@ public byte[] serialize() { @Override public void deserialize(byte[] bytes) {} + + @Override + public boolean equals(Object o) { + return o != null && getClass() == o.getClass(); + } } diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java index d4946e5e2c86..e656d68ad88c 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; public class MapTableFunctionHandle implements TableFunctionHandle { @@ -129,4 +130,35 @@ public String toString() { sb.append('}'); return sb.toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + MapTableFunctionHandle handle = (MapTableFunctionHandle) o; + return Objects.equals(map, handle.map); + } + + @Override + public int hashCode() { + return Objects.hashCode(map); + } + + public static class Builder { + private final Map map = new HashMap<>(); + + public Builder addProperty(String key, Object value) { + if (!SUPPORT_VALUE_TYPE.contains(value.getClass())) { + throw new IllegalArgumentException("Unsupported value type."); + } + map.put(key, value); + return this; + } + + public MapTableFunctionHandle build() { + MapTableFunctionHandle handle = new MapTableFunctionHandle(); + handle.map.putAll(map); + return handle; + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java index d9eef926e923..16f2f45930fd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanGraphPrinter.java @@ -82,9 +82,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode; -import org.apache.iotdb.udf.api.relational.table.argument.Argument; -import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; -import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; import com.google.common.base.Joiner; import org.apache.commons.lang3.Validate; @@ -99,7 +96,6 @@ import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; -import static java.lang.String.format; import static org.apache.iotdb.db.utils.DateTimeUtils.TIMESTAMP_PRECISION; public class PlanGraphPrinter extends PlanVisitor, PlanGraphPrinter.GraphContext> { @@ -1045,34 +1041,10 @@ public List visitTableFunctionProcessor( .getOrderingScheme() .ifPresent(orderingScheme -> boxValue.add("Order by: " + orderingScheme)); }); - if (!node.getArguments().isEmpty()) { - node.getArguments().forEach((key, value) -> boxValue.add(formatArgument(key, value))); - } + boxValue.add("TableFunctionHandle: " + node.getTableFunctionHandle()); return render(node, boxValue, context); } - private String formatArgument(String argumentName, Argument argument) { - if (argument instanceof ScalarArgument) { - return formatScalarArgument(argumentName, (ScalarArgument) argument); - } else if (argument instanceof TableArgument) { - return formatTableArgument(argumentName, (TableArgument) argument); - } else { - return argumentName + " => " + argument; - } - } - - private String formatScalarArgument(String argumentName, ScalarArgument argument) { - return format( - "%s => ScalarArgument{type=%s, value=%s}", - argumentName, argument.getType(), argument.getValue()); - } - - private String formatTableArgument(String argumentName, TableArgument argument) { - return format( - "%s => TableArgument{%s}", - argumentName, argument.isRowSemantics() ? "row semantics" : "set semantics"); - } - private String printRegion(TRegionReplicaSet regionReplicaSet) { return String.format( "Partition: %s", diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index a5f2a79a3fe7..8a7262860156 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -918,7 +918,6 @@ public RelationPlan visitTableFunctionInvocation(TableFunctionInvocation node, V new TableFunctionNode( idAllocator.genPlanNodeId(), functionAnalysis.getFunctionName(), - functionAnalysis.getPassedArguments(), functionAnalysis.getTableFunctionHandle(), properOutputs, sources.build(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java index f40e7c80e7de..87df600fe014 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/ImplementTableFunctionSource.java @@ -73,7 +73,6 @@ public Result apply(TableFunctionNode node, Captures captures, Context context) ImmutableList.of(), Optional.empty(), false, - node.getArguments(), node.getTableFunctionHandle(), false)); } else if (node.getChildren().size() == 1) { @@ -126,7 +125,6 @@ public Result apply(TableFunctionNode node, Captures captures, Context context) sourceProperties.getRequiredColumns(), sourceProperties.getDataOrganizationSpecification(), sourceProperties.isRowSemantics(), - node.getArguments(), node.getTableFunctionHandle(), sourceProperties.isRequireRecordSnapshot())); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java index 3d65469f02c6..1886ed39e85d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorColumns.java @@ -82,7 +82,6 @@ protected Optional pushDownProjectOff( node.getRequiredSymbols(), node.getDataOrganizationSpecification(), node.isRowSemantic(), - node.getArguments(), node.getTableFunctionHandle(), node.isRequireRecordSnapshot())); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java index a1d55cf492c7..e9fd2e9db007 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/iterative/rule/PruneTableFunctionProcessorSourceColumns.java @@ -91,7 +91,6 @@ public Result apply(TableFunctionProcessorNode node, Captures captures, Context node.getRequiredSymbols(), node.getDataOrganizationSpecification(), node.isRowSemantic(), - node.getArguments(), node.getTableFunctionHandle(), node.isRequireRecordSnapshot()))) .orElse(Result.empty()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java index 65034f012809..a4db3d68d2ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionNode.java @@ -28,10 +28,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; -import org.apache.iotdb.udf.api.relational.table.argument.Argument; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; @@ -39,7 +37,6 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -49,7 +46,6 @@ public class TableFunctionNode extends MultiChildProcessNode { private final String name; - private final Map arguments; private final TableFunctionHandle tableFunctionHandle; private final List properOutputs; private final List tableArgumentProperties; @@ -57,14 +53,12 @@ public class TableFunctionNode extends MultiChildProcessNode { public TableFunctionNode( PlanNodeId id, String name, - Map arguments, TableFunctionHandle tableFunctionHandle, List properOutputs, List children, List tableArgumentProperties) { super(id, children); this.name = requireNonNull(name, "name is null"); - this.arguments = ImmutableMap.copyOf(arguments); this.tableFunctionHandle = tableFunctionHandle; this.properOutputs = ImmutableList.copyOf(properOutputs); this.tableArgumentProperties = ImmutableList.copyOf(tableArgumentProperties); @@ -73,13 +67,11 @@ public TableFunctionNode( public TableFunctionNode( PlanNodeId id, String name, - Map arguments, TableFunctionHandle tableFunctionHandle, List properOutputs, List tableArgumentProperties) { super(id); this.name = requireNonNull(name, "name is null"); - this.arguments = ImmutableMap.copyOf(arguments); this.tableFunctionHandle = tableFunctionHandle; this.properOutputs = ImmutableList.copyOf(properOutputs); this.tableArgumentProperties = ImmutableList.copyOf(tableArgumentProperties); @@ -89,10 +81,6 @@ public String getName() { return name; } - public Map getArguments() { - return arguments; - } - public TableFunctionHandle getTableFunctionHandle() { return tableFunctionHandle; } @@ -108,7 +96,7 @@ public List getTableArgumentProperties() { @Override public PlanNode clone() { return new TableFunctionNode( - id, name, arguments, tableFunctionHandle, properOutputs, tableArgumentProperties); + id, name, tableFunctionHandle, properOutputs, tableArgumentProperties); } @Override @@ -144,7 +132,6 @@ public PlanNode replaceChildren(List newSources) { return new TableFunctionNode( getPlanNodeId(), name, - arguments, tableFunctionHandle, properOutputs, newSources, @@ -155,11 +142,6 @@ public PlanNode replaceChildren(List newSources) { protected void serializeAttributes(ByteBuffer byteBuffer) { PlanNodeType.TABLE_FUNCTION_NODE.serialize(byteBuffer); ReadWriteIOUtils.write(name, byteBuffer); - ReadWriteIOUtils.write(arguments.size(), byteBuffer); - for (Map.Entry entry : arguments.entrySet()) { - ReadWriteIOUtils.write(entry.getKey(), byteBuffer); - entry.getValue().serialize(byteBuffer); - } byte[] bytes = tableFunctionHandle.serialize(); ReadWriteIOUtils.write(bytes.length, byteBuffer); ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), byteBuffer); @@ -186,11 +168,6 @@ protected void serializeAttributes(ByteBuffer byteBuffer) { protected void serializeAttributes(DataOutputStream stream) throws IOException { PlanNodeType.TABLE_FUNCTION_NODE.serialize(stream); ReadWriteIOUtils.write(name, stream); - ReadWriteIOUtils.write(arguments.size(), stream); - for (Map.Entry entry : arguments.entrySet()) { - ReadWriteIOUtils.write(entry.getKey(), stream); - entry.getValue().serialize(stream); - } byte[] bytes = tableFunctionHandle.serialize(); ReadWriteIOUtils.write(bytes.length, stream); ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), stream); @@ -218,18 +195,10 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException { public static TableFunctionNode deserialize(ByteBuffer byteBuffer) { String name = ReadWriteIOUtils.readString(byteBuffer); int size = ReadWriteIOUtils.readInt(byteBuffer); - ImmutableMap.Builder arguments = ImmutableMap.builder(); - for (int i = 0; i < size; i++) { - String key = ReadWriteIOUtils.readString(byteBuffer); - Argument value = Argument.deserialize(byteBuffer); - arguments.put(key, value); - } - size = ReadWriteIOUtils.readInt(byteBuffer); byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, size); TableFunctionHandle tableFunctionHandle = new TableMetadataImpl().getTableFunction(name).createTableFunctionHandle(); tableFunctionHandle.deserialize(bytes); - // TODO: how to get table function handle instance more elegantly? size = ReadWriteIOUtils.readInt(byteBuffer); ImmutableList.Builder properOutputs = ImmutableList.builder(); for (int i = 0; i < size; i++) { @@ -267,7 +236,6 @@ public static TableFunctionNode deserialize(ByteBuffer byteBuffer) { return new TableFunctionNode( planNodeId, name, - arguments.build(), tableFunctionHandle, properOutputs.build(), tableArgumentProperties.build()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java index 260578f5f70c..5172aa786a3a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java @@ -28,19 +28,15 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; -import org.apache.iotdb.udf.api.relational.table.argument.Argument; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import org.apache.tsfile.utils.ReadWriteIOUtils; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -65,8 +61,6 @@ public class TableFunctionProcessorNode extends SingleChildProcessNode { private final boolean rowSemantic; - private final Map arguments; - private final TableFunctionHandle tableFunctionHandle; private final boolean requireRecordSnapshot; @@ -80,7 +74,6 @@ public TableFunctionProcessorNode( List requiredSymbols, Optional dataOrganizationSpecification, boolean rowSemantic, - Map arguments, TableFunctionHandle tableFunctionHandle, boolean requireRecordSnapshot) { super(id, source.orElse(null)); @@ -91,7 +84,6 @@ public TableFunctionProcessorNode( this.dataOrganizationSpecification = requireNonNull(dataOrganizationSpecification, "specification is null"); this.rowSemantic = rowSemantic; - this.arguments = ImmutableMap.copyOf(arguments); this.tableFunctionHandle = tableFunctionHandle; this.requireRecordSnapshot = requireRecordSnapshot; } @@ -104,7 +96,6 @@ public TableFunctionProcessorNode( List requiredSymbols, Optional dataOrganizationSpecification, boolean rowSemantic, - Map arguments, TableFunctionHandle tableFunctionHandle, boolean requireRecordSnapshot) { super(id); @@ -115,7 +106,6 @@ public TableFunctionProcessorNode( this.dataOrganizationSpecification = requireNonNull(dataOrganizationSpecification, "specification is null"); this.rowSemantic = rowSemantic; - this.arguments = ImmutableMap.copyOf(arguments); this.tableFunctionHandle = tableFunctionHandle; this.requireRecordSnapshot = requireRecordSnapshot; } @@ -144,10 +134,6 @@ public Optional getDataOrganizationSpecification( return dataOrganizationSpecification; } - public Map getArguments() { - return arguments; - } - public TableFunctionHandle getTableFunctionHandle() { return tableFunctionHandle; } @@ -166,7 +152,6 @@ public PlanNode clone() { requiredSymbols, dataOrganizationSpecification, rowSemantic, - arguments, tableFunctionHandle, requireRecordSnapshot); } @@ -217,12 +202,6 @@ protected void serializeAttributes(ByteBuffer byteBuffer) { dataOrganizationSpecification.get().serialize(byteBuffer); } ReadWriteIOUtils.write(rowSemantic, byteBuffer); - ReadWriteIOUtils.write(arguments.size(), byteBuffer); - arguments.forEach( - (key, value) -> { - ReadWriteIOUtils.write(key, byteBuffer); - value.serialize(byteBuffer); - }); byte[] bytes = tableFunctionHandle.serialize(); ReadWriteIOUtils.write(bytes.length, byteBuffer); ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), byteBuffer); @@ -250,11 +229,6 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException { dataOrganizationSpecification.get().serialize(stream); } ReadWriteIOUtils.write(rowSemantic, stream); - ReadWriteIOUtils.write(arguments.size(), stream); - for (Map.Entry entry : arguments.entrySet()) { - ReadWriteIOUtils.write(entry.getKey(), stream); - entry.getValue().serialize(stream); - } byte[] bytes = tableFunctionHandle.serialize(); ReadWriteIOUtils.write(bytes.length, stream); ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), stream); @@ -285,17 +259,10 @@ public static TableFunctionProcessorNode deserialize(ByteBuffer byteBuffer) { : Optional.empty(); boolean rowSemantic = ReadWriteIOUtils.readBoolean(byteBuffer); size = ReadWriteIOUtils.readInt(byteBuffer); - Map arguments = new HashMap<>(size); - while (size-- > 0) { - String key = ReadWriteIOUtils.readString(byteBuffer); - Argument value = Argument.deserialize(byteBuffer); - arguments.put(key, value); - } byte[] bytes = ReadWriteIOUtils.readBytes(byteBuffer, size); TableFunctionHandle tableFunctionHandle = new TableMetadataImpl().getTableFunction(name).createTableFunctionHandle(); tableFunctionHandle.deserialize(bytes); - // TODO: how to get table function handle instance more elegantly? boolean requireRecordSnapshot = ReadWriteIOUtils.readBoolean(byteBuffer); PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer); @@ -307,7 +274,6 @@ public static TableFunctionProcessorNode deserialize(ByteBuffer byteBuffer) { requiredSymbols, dataOrganizationSpecification, rowSemantic, - arguments, tableFunctionHandle, requireRecordSnapshot); } @@ -325,7 +291,6 @@ public PlanNode replaceChildren(List newSources) { requiredSymbols, dataOrganizationSpecification, rowSemantic, - arguments, tableFunctionHandle, requireRecordSnapshot); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java index 1f13f5b893a2..b9ac030a8b8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/UnaliasSymbolReferences.java @@ -870,7 +870,6 @@ public PlanAndMappings visitTableFunction(TableFunctionNode node, UnaliasContext new TableFunctionNode( node.getPlanNodeId(), node.getName(), - node.getArguments(), node.getTableFunctionHandle(), newProperOutputs, newSources.build(), @@ -894,7 +893,6 @@ public PlanAndMappings visitTableFunctionProcessor( ImmutableList.of(), Optional.empty(), node.isRowSemantic(), - node.getArguments(), node.getTableFunctionHandle(), node.isRequireRecordSnapshot()), mapping); @@ -932,7 +930,6 @@ public PlanAndMappings visitTableFunctionProcessor( newRequiredSymbols, newSpecification, node.isRowSemantic(), - node.getArguments(), node.getTableFunctionHandle(), node.isRequireRecordSnapshot()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java index 766fdffcd3de..98cb45f84ef1 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Repeat.java @@ -65,8 +65,8 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF "count argument for function repeat() must be positive"); } - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty(N_PARAM, count.getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder().addProperty(N_PARAM, count.getValue()).build(); return TableFunctionAnalysis.builder() .properColumnSchema(DescribedSchema.builder().addField("repeat_index", Type.INT32).build()) .requiredColumns( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java index ef86ccb30284..931a64d89ad2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/function/Split.java @@ -59,11 +59,15 @@ public List getArgumentsSpecifications() { @Override public TableFunctionAnalysis analyze(Map arguments) throws UDFException { DescribedSchema schema = DescribedSchema.builder().addField("output", Type.STRING).build(); - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty( - INPUT_PARAMETER_NAME, ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()); - handle.addProperty( - SPLIT_PARAMETER_NAME, ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder() + .addProperty( + INPUT_PARAMETER_NAME, + ((ScalarArgument) arguments.get(INPUT_PARAMETER_NAME)).getValue()) + .addProperty( + SPLIT_PARAMETER_NAME, + ((ScalarArgument) arguments.get(SPLIT_PARAMETER_NAME)).getValue()) + .build(); return TableFunctionAnalysis.builder().properColumnSchema(schema).handle(handle).build(); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java index 71854d8b5be1..63924ed0111d 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java @@ -24,6 +24,8 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern; import org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.TableFunctionProcessorMatcher; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; +import org.apache.iotdb.udf.api.relational.EmptyTableFunctionHandle; +import org.apache.iotdb.udf.api.relational.table.MapTableFunctionHandle; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -44,10 +46,8 @@ import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.output; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.project; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.sort; -import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.specification; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableFunctionProcessor; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.PlanMatchPattern.tableScan; -import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.TableFunctionProcessorMatcher.TableArgumentValue.Builder.tableArgument; public class TableFunctionTest { @@ -73,16 +73,12 @@ public void testSimpleRowSemantic() { .name("hop") .properOutputs("window_start", "window_end") .requiredSymbols("time") - .addScalarArgument("TIMECOL", "time") - .addScalarArgument("SIZE", 3600000L) - .addScalarArgument("SLIDE", 1800000L) - .addScalarArgument("ORIGIN", 0L) - .addTableArgument( - "DATA", - tableArgument() - .rowSemantics() - .passThroughSymbols( - "time", "tag1", "tag2", "tag3", "attr1", "attr2", "s1", "s2", "s3")); + .handle( + new MapTableFunctionHandle.Builder() + .addProperty("SIZE", 3600000L) + .addProperty("SLIDE", 1800000L) + .addProperty("ORIGIN", 0L) + .build()); // Verify full LogicalPlan // Output - TableFunctionProcessor - TableScan assertPlan(logicalQueryPlan, anyTree(tableFunctionProcessor(tableFunctionMatcher, tableScan))); @@ -129,14 +125,7 @@ public void testSimpleRowSemantic2() { .properOutputs("time", "tag1", "tag2", "tag3", "attr2", "s1", "s2", "s3") .requiredSymbols( "time_0", "tag1_1", "tag2_2", "tag3_3", "attr2_4", "s1_5", "s2_6", "s3_7") - .addScalarArgument("EXCLUDE", "attr1") - .addTableArgument( - "DATA", - tableArgument() - .specification( - specification( - ImmutableList.of(), ImmutableList.of(), ImmutableMap.of())) - .rowSemantics()); + .handle(new EmptyTableFunctionHandle()); // Verify full LogicalPlan // Output - TableFunctionProcessor - TableScan assertPlan(logicalQueryPlan, anyTree(tableFunctionProcessor(tableFunctionMatcher, tableScan))); @@ -185,17 +174,7 @@ public void testSimpleSetSemantic() { .name("repeat") .properOutputs("repeat_index") .requiredSymbols("time") - .addScalarArgument("N", 2) - .addTableArgument( - "DATA", - tableArgument() - .specification( - specification( - ImmutableList.of("tag1", "tag2", "tag3"), - ImmutableList.of(), - ImmutableMap.of())) - .passThroughSymbols( - "time", "tag1", "tag2", "tag3", "attr1", "attr2", "s1", "s2", "s3")); + .handle(new MapTableFunctionHandle.Builder().addProperty("N", 2).build()); // Verify full LogicalPlan // Output - TableFunctionProcessor - GroupNode - TableScan assertPlan( @@ -241,7 +220,11 @@ public void testLeafFunction() { .name("split") .properOutputs("output") .requiredSymbols() - .addScalarArgument("INPUT", "1,2,3,4,5"); + .handle( + new MapTableFunctionHandle.Builder() + .addProperty("INPUT", "1,2,3,4,5") + .addProperty("SPLIT", ",") + .build()); // Verify full LogicalPlan // Output - TableFunctionProcessor - TableScan assertPlan(logicalQueryPlan, anyTree(tableFunctionProcessor(tableFunctionMatcher))); @@ -255,14 +238,22 @@ public void testLeafFunction() { .name("split") .properOutputs("output") .requiredSymbols() - .addScalarArgument("INPUT", "1,2,4,5"); + .handle( + new MapTableFunctionHandle.Builder() + .addProperty("INPUT", "1,2,4,5") + .addProperty("SPLIT", ",") + .build()); Consumer tableFunctionMatcher2 = builder -> builder .name("split") .properOutputs("output_0") .requiredSymbols() - .addScalarArgument("INPUT", "2,3,4"); + .handle( + new MapTableFunctionHandle.Builder() + .addProperty("INPUT", "2,3,4") + .addProperty("SPLIT", ",") + .build()); // Verify full LogicalPlan // Output - TableFunctionProcessor - TableScan assertPlan( @@ -301,21 +292,19 @@ public void testHybrid() { .properOutputs("time", "tag1", "tag2", "tag3", "attr2", "s1", "s2", "s3") .requiredSymbols( "time_0", "tag1_1", "tag2_2", "tag3_3", "attr2_4", "s1_5", "s2_6", "s3_7") - .addScalarArgument("EXCLUDE", "attr1") - .addTableArgument("DATA", tableArgument().rowSemantics()); + .handle(new EmptyTableFunctionHandle()); Consumer hopMatcher = builder -> builder .name("hop") .properOutputs("window_start", "window_end") .requiredSymbols("time") - .addScalarArgument("TIMECOL", "time") - .addScalarArgument("SIZE", 3600000L) - .addScalarArgument("SLIDE", 1800000L) - .addScalarArgument("ORIGIN", 0L) - .addTableArgument( - "DATA", - tableArgument().rowSemantics().passThroughSymbols("tag1", "tag2", "tag3")); + .handle( + new MapTableFunctionHandle.Builder() + .addProperty("SIZE", 3600000L) + .addProperty("SLIDE", 1800000L) + .addProperty("ORIGIN", 0L) + .build()); // Verify full LogicalPlan // Output - Aggregation - HOP - Project - EXCLUDE - TableScan assertPlan( diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/TableFunctionProcessorMatcher.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/TableFunctionProcessorMatcher.java index 52d553368f7b..c0c534f494d5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/TableFunctionProcessorMatcher.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/planner/assertions/TableFunctionProcessorMatcher.java @@ -22,29 +22,19 @@ import org.apache.iotdb.db.queryengine.common.SessionInfo; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.Metadata; -import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; -import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; -import org.apache.iotdb.udf.api.relational.table.argument.Argument; -import org.apache.iotdb.udf.api.relational.table.argument.ScalarArgument; -import org.apache.iotdb.udf.api.relational.table.argument.TableArgument; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Set; import static com.google.common.base.MoreObjects.toStringHelper; import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.MatchResult.NO_MATCH; import static org.apache.iotdb.db.queryengine.plan.relational.planner.assertions.MatchResult.match; @@ -54,17 +44,17 @@ public class TableFunctionProcessorMatcher implements Matcher { private final String name; private final List properOutputs; private final List requiredSymbols; - private final Map arguments; + private final TableFunctionHandle handle; private TableFunctionProcessorMatcher( String name, List properOutputs, List requiredSymbols, - Map arguments) { + TableFunctionHandle handle) { this.name = requireNonNull(name, "name is null"); this.properOutputs = ImmutableList.copyOf(properOutputs); this.requiredSymbols = ImmutableList.copyOf(requiredSymbols); - this.arguments = ImmutableMap.copyOf(arguments); + this.handle = handle; } @Override @@ -96,67 +86,8 @@ public MatchResult detailMatches( if (!expectedRequired.equals(actualRequired)) { return NO_MATCH; } - for (Map.Entry entry : arguments.entrySet()) { - String argumentName = entry.getKey(); - Argument actual = tableFunctionProcessorNode.getArguments().get(argumentName); - if (actual == null) { - return NO_MATCH; - } - ArgumentValue expected = entry.getValue(); - if (expected instanceof ScalarArgumentValue) { - if (!(actual instanceof ScalarArgument)) { - return NO_MATCH; - } - ScalarArgumentValue expectedScalar = (ScalarArgumentValue) expected; - ScalarArgument actualScalar = (ScalarArgument) actual; - if (!Objects.equals(expectedScalar.value, actualScalar.getValue())) { - return NO_MATCH; - } - - } else { - if (!(actual instanceof TableArgument)) { - return NO_MATCH; - } - TableArgumentValue expectedTableArgument = (TableArgumentValue) expected; - TableArgument actualTableArgument = (TableArgument) actual; - if (expectedTableArgument.rowSemantics != actualTableArgument.isRowSemantics()) { - // check row semantic - return NO_MATCH; - } - if (expectedTableArgument.passThroughColumns) { - // check pass through columns - Optional passThroughSpecification = - tableFunctionProcessorNode.getPassThroughSpecification(); - if (!passThroughSpecification.isPresent() - || !passThroughSpecification.get().isDeclaredAsPassThrough()) { - return NO_MATCH; - } - Set expectedPassThrough = - expectedTableArgument.passThroughSymbol.stream() - .map(symbolAliases::get) - .collect(toImmutableSet()); - Set actualPassThrough = - passThroughSpecification.get().getColumns().stream() - .map(TableFunctionNode.PassThroughColumn::getSymbol) - .map(Symbol::toSymbolReference) - .collect(toImmutableSet()); - - if (!expectedPassThrough.equals(actualPassThrough)) { - return NO_MATCH; - } - } - if (expectedTableArgument.specification.isPresent() - && tableFunctionProcessorNode.getDataOrganizationSpecification().isPresent()) { - // check data organization - DataOrganizationSpecification expectedDataOrganization = - expectedTableArgument.specification.get().getExpectedValue(symbolAliases); - DataOrganizationSpecification actualDataOrganization = - tableFunctionProcessorNode.getDataOrganizationSpecification().get(); - if (!expectedDataOrganization.equals(actualDataOrganization)) { - return NO_MATCH; - } - } - } + if (!handle.equals(tableFunctionProcessorNode.getTableFunctionHandle())) { + return NO_MATCH; } ImmutableMap.Builder properOutputsMapping = ImmutableMap.builder(); for (int i = 0; i < properOutputs.size(); i++) { @@ -179,17 +110,15 @@ public String toString() { .add("name", name) .add("properOutputs", properOutputs) .add("requiredSymbols", requiredSymbols) - .add("arguments", arguments) + .add("handle", handle) .toString(); } - public interface ArgumentValue {} - public static class Builder { private String name; private List properOutputs = ImmutableList.of(); private List requiredSymbols = ImmutableList.of(); - private final ImmutableMap.Builder arguments = ImmutableMap.builder(); + private TableFunctionHandle handle; public Builder name(String name) { this.name = name; @@ -206,98 +135,13 @@ public Builder requiredSymbols(String... requiredSymbols) { return this; } - public TableFunctionProcessorMatcher.Builder addScalarArgument(String name, Object value) { - this.arguments.put(name, new ScalarArgumentValue(value)); - return this; - } - - public TableFunctionProcessorMatcher.Builder addTableArgument( - String name, TableArgumentValue.Builder tableArgument) { - this.arguments.put(name, tableArgument.build()); + public Builder handle(TableFunctionHandle handle) { + this.handle = handle; return this; } public TableFunctionProcessorMatcher build() { - return new TableFunctionProcessorMatcher( - name, properOutputs, requiredSymbols, arguments.build()); - } - } - - public static class ScalarArgumentValue implements ArgumentValue { - protected Object value; - - public ScalarArgumentValue(Object value) { - this.value = value; - } - - @Override - public String toString() { - return toStringHelper(this).add("value", value).toString(); - } - } - - public static class TableArgumentValue implements ArgumentValue { - protected boolean rowSemantics; - protected boolean passThroughColumns; - protected Optional> specification; - protected Set passThroughSymbol; - - public TableArgumentValue( - boolean rowSemantics, - boolean passThroughColumns, - Optional> specification, - Set passThroughSymbol) { - this.rowSemantics = rowSemantics; - this.passThroughColumns = passThroughColumns; - this.specification = specification; - this.passThroughSymbol = passThroughSymbol; - } - - @Override - public String toString() { - return toStringHelper(this) - .omitNullValues() - .add("rowSemantics", rowSemantics) - .add("passThroughColumns", passThroughColumns) - .add("specification", specification) - .add("passThroughSymbol", passThroughSymbol) - .toString(); - } - - public static class Builder { - private boolean rowSemantics; - private boolean passThroughColumns; - private Optional> specification = - Optional.empty(); - private Set passThroughSymbols = ImmutableSet.of(); - - private Builder() {} - - public static Builder tableArgument() { - return new Builder(); - } - - public Builder rowSemantics() { - this.rowSemantics = true; - return this; - } - - public Builder specification( - ExpectedValueProvider specification) { - this.specification = Optional.of(specification); - return this; - } - - public Builder passThroughSymbols(String... symbols) { - this.passThroughColumns = true; - this.passThroughSymbols = ImmutableSet.copyOf(symbols); - return this; - } - - private TableArgumentValue build() { - return new TableArgumentValue( - rowSemantics, passThroughColumns, specification, passThroughSymbols); - } + return new TableFunctionProcessorMatcher(name, properOutputs, requiredSymbols, handle); } } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java index 630273efbb07..a87acce17f7f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CapacityTableFunction.java @@ -62,8 +62,8 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF if (size <= 0) { throw new UDFException("Size must be greater than 0"); } - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty(SIZE_PARAMETER_NAME, size); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder().addProperty(SIZE_PARAMETER_NAME, size).build(); return TableFunctionAnalysis.builder() .properColumnSchema( new DescribedSchema.Builder().addField("window_index", Type.INT64).build()) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java index b37ea8450d59..e0c118516048 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/CumulateTableFunction.java @@ -96,11 +96,14 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF .addField("window_start", Type.TIMESTAMP) .addField("window_end", Type.TIMESTAMP) .build(); - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty(STEP_PARAMETER_NAME, step); - handle.addProperty(SIZE_PARAMETER_NAME, size); - handle.addProperty( - ORIGIN_PARAMETER_NAME, ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder() + .addProperty(STEP_PARAMETER_NAME, step) + .addProperty(SIZE_PARAMETER_NAME, size) + .addProperty( + ORIGIN_PARAMETER_NAME, + ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()) + .build(); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java index 382fd349dbad..07c4089a736f 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/HOPTableFunction.java @@ -86,13 +86,18 @@ public TableFunctionAnalysis analyze(Map arguments) { .addField("window_start", Type.TIMESTAMP) .addField("window_end", Type.TIMESTAMP) .build(); - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty( - ORIGIN_PARAMETER_NAME, ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()); - handle.addProperty( - SLIDE_PARAMETER_NAME, ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue()); - handle.addProperty( - SIZE_PARAMETER_NAME, ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder() + .addProperty( + ORIGIN_PARAMETER_NAME, + ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()) + .addProperty( + SLIDE_PARAMETER_NAME, + ((ScalarArgument) arguments.get(SLIDE_PARAMETER_NAME)).getValue()) + .addProperty( + SIZE_PARAMETER_NAME, + ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()) + .build(); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java index 4133d36c8da0..7aa0bed82acf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/SessionTableFunction.java @@ -77,9 +77,11 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF .addField("window_end", Type.TIMESTAMP) .build(); - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty( - GAP_PARAMETER_NAME, ((ScalarArgument) arguments.get(GAP_PARAMETER_NAME)).getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder() + .addProperty( + GAP_PARAMETER_NAME, ((ScalarArgument) arguments.get(GAP_PARAMETER_NAME)).getValue()) + .build(); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java index 161e2b040c24..fe5db7e2a108 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/TumbleTableFunction.java @@ -85,11 +85,15 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF .addField("window_end", Type.TIMESTAMP) .build(); - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty( - ORIGIN_PARAMETER_NAME, ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()); - handle.addProperty( - SIZE_PARAMETER_NAME, ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder() + .addProperty( + ORIGIN_PARAMETER_NAME, + ((ScalarArgument) arguments.get(ORIGIN_PARAMETER_NAME)).getValue()) + .addProperty( + SIZE_PARAMETER_NAME, + ((ScalarArgument) arguments.get(SIZE_PARAMETER_NAME)).getValue()) + .build(); // outputColumnSchema return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java index 6faf47289b82..5a0f992bb551 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java @@ -78,9 +78,12 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF DescribedSchema properColumnSchema = new DescribedSchema.Builder().addField("window_index", Type.INT64).build(); // outputColumnSchema - MapTableFunctionHandle handle = new MapTableFunctionHandle(); - handle.addProperty( - DELTA_PARAMETER_NAME, ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue()); + MapTableFunctionHandle handle = + new MapTableFunctionHandle.Builder() + .addProperty( + DELTA_PARAMETER_NAME, + ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue()) + .build(); return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) .requireRecordSnapshot(false) From 467768eb5c96731bef31926dc4626774409c3a21 Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Thu, 24 Apr 2025 23:05:53 +0900 Subject: [PATCH 4/5] compile error --- .../query/udf/example/relational/MySelectColumn.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySelectColumn.java b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySelectColumn.java index 27409032702d..6cc0d683e100 100644 --- a/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySelectColumn.java +++ b/integration-test/src/main/java/org/apache/iotdb/db/query/udf/example/relational/MySelectColumn.java @@ -20,8 +20,10 @@ package org.apache.iotdb.db.query.udf.example.relational; import org.apache.iotdb.udf.api.exception.UDFException; +import org.apache.iotdb.udf.api.relational.EmptyTableFunctionHandle; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.table.TableFunctionAnalysis; +import org.apache.iotdb.udf.api.relational.table.TableFunctionHandle; import org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider; import org.apache.iotdb.udf.api.relational.table.argument.Argument; import org.apache.iotdb.udf.api.relational.table.argument.DescribedSchema; @@ -66,11 +68,18 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF return TableFunctionAnalysis.builder() .properColumnSchema(schemaBuilder.build()) .requiredColumns(TBL_PARAM, requiredColumns) + .handle(new EmptyTableFunctionHandle()) .build(); } @Override - public TableFunctionProcessorProvider getProcessorProvider(Map arguments) { + public TableFunctionHandle createTableFunctionHandle() { + return new EmptyTableFunctionHandle(); + } + + @Override + public TableFunctionProcessorProvider getProcessorProvider( + TableFunctionHandle tableFunctionHandle) { return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { From 55921221dee903b629f225cee0a62746859b365a Mon Sep 17 00:00:00 2001 From: Chen YZ Date: Fri, 25 Apr 2025 22:01:28 +0900 Subject: [PATCH 5/5] fix IT --- .../table/MapTableFunctionHandle.java | 26 +++++++++++++++++-- .../node/TableFunctionProcessorNode.java | 2 -- .../analyzer/TableFunctionTest.java | 17 ++++++++++++ 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java index e656d68ad88c..da27eb22cd23 100644 --- a/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java +++ b/iotdb-api/udf-api/src/main/java/org/apache/iotdb/udf/api/relational/table/MapTableFunctionHandle.java @@ -34,7 +34,7 @@ public class MapTableFunctionHandle implements TableFunctionHandle { private static final Set> SUPPORT_VALUE_TYPE = new HashSet<>( Arrays.asList( - Integer.class, Long.class, Double.class, Float.class, String.class, Double.class)); + Integer.class, Long.class, Double.class, Float.class, String.class, Boolean.class)); private final Map map = new HashMap<>(); public void addProperty(String key, Object value) { @@ -50,7 +50,7 @@ public Object getProperty(String key) { @Override public byte[] serialize() { - ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES); + ByteBuffer buffer = ByteBuffer.allocate(calculateSerializeSize()); buffer.putInt(map.size()); for (Map.Entry entry : map.entrySet()) { byte[] bytes = entry.getKey().getBytes(StandardCharsets.UTF_8); @@ -81,6 +81,28 @@ public byte[] serialize() { return buffer.array(); } + private int calculateSerializeSize() { + int size = Integer.SIZE; + for (Map.Entry entry : map.entrySet()) { + size += Integer.BYTES + entry.getKey().getBytes(StandardCharsets.UTF_8).length + Byte.BYTES; + if (entry.getValue() instanceof Long) { + size += Long.BYTES; + } else if (entry.getValue() instanceof Integer) { + size += Integer.BYTES; + } else if (entry.getValue() instanceof Double) { + size += Double.BYTES; + } else if (entry.getValue() instanceof Float) { + size += Float.BYTES; + } else if (entry.getValue() instanceof Boolean) { + size += Byte.BYTES; + } else if (entry.getValue() instanceof String) { + byte[] bytes = ((String) entry.getValue()).getBytes(StandardCharsets.UTF_8); + size += Integer.BYTES + bytes.length; + } + } + return size; + } + @Override public void deserialize(byte[] bytes) { ByteBuffer buffer = ByteBuffer.wrap(bytes); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java index 5172aa786a3a..2ffb128b66be 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/TableFunctionProcessorNode.java @@ -203,7 +203,6 @@ protected void serializeAttributes(ByteBuffer byteBuffer) { } ReadWriteIOUtils.write(rowSemantic, byteBuffer); byte[] bytes = tableFunctionHandle.serialize(); - ReadWriteIOUtils.write(bytes.length, byteBuffer); ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), byteBuffer); ReadWriteIOUtils.write(requireRecordSnapshot, byteBuffer); } @@ -230,7 +229,6 @@ protected void serializeAttributes(DataOutputStream stream) throws IOException { } ReadWriteIOUtils.write(rowSemantic, stream); byte[] bytes = tableFunctionHandle.serialize(); - ReadWriteIOUtils.write(bytes.length, stream); ReadWriteIOUtils.write(ByteBuffer.wrap(bytes), stream); ReadWriteIOUtils.write(requireRecordSnapshot, stream); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java index 63924ed0111d..3aa3d85fa809 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/TableFunctionTest.java @@ -315,4 +315,21 @@ public void testHybrid() { tableFunctionProcessor( hopMatcher, project(tableFunctionProcessor(excludeMatcher, tableScan)))))); } + + @Test + public void testSerDeserializeMapTableFunctionHandle() { + MapTableFunctionHandle mapTableFunctionHandle = + new MapTableFunctionHandle.Builder() + .addProperty("key1", "value1") + .addProperty("key2", 2) + .addProperty("key3", 1L) + .addProperty("key4", 3.0) + .addProperty("key5", true) + .addProperty("key6", 2.3f) + .build(); + byte[] serialized = mapTableFunctionHandle.serialize(); + MapTableFunctionHandle deserialized = new MapTableFunctionHandle(); + deserialized.deserialize(serialized); + assert mapTableFunctionHandle.equals(deserialized); + } }