Skip to content

Commit 5b61b1c

Browse files
authored
[FLINK-38556][table] Support filter and project between source and delta join (#27159)
1 parent f843c12 commit 5b61b1c

File tree

15 files changed

+2152
-253
lines changed

15 files changed

+2152
-253
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DeltaJoinSpec.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
2525
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
2626
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
27+
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
2728
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
2829

2930
import org.apache.calcite.rex.RexNode;
3031

3132
import javax.annotation.Nullable;
3233

34+
import java.util.List;
3335
import java.util.Map;
3436
import java.util.Optional;
3537

@@ -44,6 +46,9 @@ public class DeltaJoinSpec {
4446
public static final String FIELD_NAME_LOOKUP_TABLE = "lookupTable";
4547
public static final String FIELD_NAME_LOOKUP_KEYS = "lookupKeys";
4648
public static final String FIELD_NAME_REMAINING_CONDITION = "remainingCondition";
49+
public static final String FIELD_NAME_PROJECTION_ON_TEMPORAL_TABLE =
50+
"projectionOnTemporalTable";
51+
public static final String FIELD_NAME_FILTER_ON_TEMPORAL_TABLE = "filterOnTemporalTable";
4752

4853
@JsonProperty(FIELD_NAME_LOOKUP_TABLE)
4954
private final TemporalTableSourceSpec lookupTable;
@@ -56,15 +61,29 @@ public class DeltaJoinSpec {
5661
@JsonProperty(FIELD_NAME_REMAINING_CONDITION)
5762
private final @Nullable RexNode remainingCondition;
5863

64+
@JsonProperty(FIELD_NAME_PROJECTION_ON_TEMPORAL_TABLE)
65+
@JsonInclude(JsonInclude.Include.NON_NULL)
66+
private final @Nullable List<RexNode> projectionOnTemporalTable;
67+
68+
@JsonProperty(FIELD_NAME_FILTER_ON_TEMPORAL_TABLE)
69+
@JsonInclude(JsonInclude.Include.NON_NULL)
70+
private final @Nullable RexNode filterOnTemporalTable;
71+
5972
@JsonCreator
6073
public DeltaJoinSpec(
6174
@JsonProperty(FIELD_NAME_LOOKUP_TABLE) TemporalTableSourceSpec lookupTable,
6275
@JsonProperty(FIELD_NAME_LOOKUP_KEYS)
6376
Map<Integer, FunctionCallUtil.FunctionParam> lookupKeyMap,
64-
@JsonProperty(FIELD_NAME_REMAINING_CONDITION) @Nullable RexNode remainingCondition) {
77+
@JsonProperty(FIELD_NAME_REMAINING_CONDITION) @Nullable RexNode remainingCondition,
78+
@JsonProperty(FIELD_NAME_PROJECTION_ON_TEMPORAL_TABLE) @Nullable
79+
List<RexNode> projectionOnTemporalTable,
80+
@JsonProperty(FIELD_NAME_FILTER_ON_TEMPORAL_TABLE) @Nullable
81+
RexNode filterOnTemporalTable) {
6582
this.lookupKeyMap = lookupKeyMap;
6683
this.lookupTable = lookupTable;
6784
this.remainingCondition = remainingCondition;
85+
this.projectionOnTemporalTable = projectionOnTemporalTable;
86+
this.filterOnTemporalTable = filterOnTemporalTable;
6887
}
6988

7089
@JsonIgnore
@@ -81,4 +100,14 @@ public Map<Integer, FunctionCallUtil.FunctionParam> getLookupKeyMap() {
81100
public Optional<RexNode> getRemainingCondition() {
82101
return Optional.ofNullable(remainingCondition);
83102
}
103+
104+
@JsonIgnore
105+
public Optional<List<RexNode>> getProjectionOnTemporalTable() {
106+
return Optional.ofNullable(projectionOnTemporalTable);
107+
}
108+
109+
@JsonIgnore
110+
public Optional<RexNode> getFilterOnTemporalTable() {
111+
return Optional.ofNullable(filterOnTemporalTable);
112+
}
84113
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.table.planner.plan.nodes.exec.stream;
2020

2121
import org.apache.flink.FlinkVersion;
22+
import org.apache.flink.api.common.functions.FlatMapFunction;
2223
import org.apache.flink.api.dag.Transformation;
2324
import org.apache.flink.api.java.tuple.Tuple2;
2425
import org.apache.flink.configuration.ReadableConfig;
@@ -54,6 +55,7 @@
5455
import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
5556
import org.apache.flink.table.planner.utils.ShortcutUtils;
5657
import org.apache.flink.table.runtime.collector.TableFunctionResultFuture;
58+
import org.apache.flink.table.runtime.generated.GeneratedFunction;
5759
import org.apache.flink.table.runtime.generated.GeneratedResultFuture;
5860
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
5961
import org.apache.flink.table.runtime.operators.StreamingDeltaJoinOperatorFactory;
@@ -389,6 +391,7 @@ private AsyncDeltaJoinRunner createAsyncDeltaJoinRunner(
389391
boolean treatRightAsLookupTable) {
390392
RelOptTable lookupTable = treatRightAsLookupTable ? rightTempTable : leftTempTable;
391393
RowType streamSideType = treatRightAsLookupTable ? leftStreamSideType : rightStreamSideType;
394+
RowType lookupSideType = treatRightAsLookupTable ? rightStreamSideType : leftStreamSideType;
392395

393396
AsyncTableFunction<?> lookupSideAsyncTableFunction =
394397
getUnwrappedAsyncLookupFunction(lookupTable, lookupKeys.keySet(), classLoader);
@@ -454,11 +457,36 @@ private AsyncDeltaJoinRunner createAsyncDeltaJoinRunner(
454457
JavaScalaConversionUtil.toScala(newCond));
455458
}
456459

460+
GeneratedFunction<FlatMapFunction<RowData, RowData>> lookupSideGeneratedCalc = null;
461+
if ((treatRightAsLookupTable
462+
&& lookupRightTableJoinSpec.getProjectionOnTemporalTable().isPresent())
463+
|| (!treatRightAsLookupTable
464+
&& lookupLeftTableJoinSpec.getProjectionOnTemporalTable().isPresent())) {
465+
// a projection or filter after lookup table
466+
List<RexNode> projectionOnTemporalTable =
467+
treatRightAsLookupTable
468+
? lookupRightTableJoinSpec.getProjectionOnTemporalTable().get()
469+
: lookupLeftTableJoinSpec.getProjectionOnTemporalTable().get();
470+
RexNode filterOnTemporalTable =
471+
treatRightAsLookupTable
472+
? lookupRightTableJoinSpec.getFilterOnTemporalTable().orElse(null)
473+
: lookupLeftTableJoinSpec.getFilterOnTemporalTable().orElse(null);
474+
lookupSideGeneratedCalc =
475+
LookupJoinCodeGenerator.generateCalcMapFunction(
476+
config,
477+
planner.getFlinkContext().getClassLoader(),
478+
JavaScalaConversionUtil.toScala(projectionOnTemporalTable),
479+
filterOnTemporalTable,
480+
lookupSideType,
481+
lookupTableSourceRowType);
482+
}
483+
457484
return new AsyncDeltaJoinRunner(
458485
lookupSideGeneratedFuncWithType.tableFunc(),
459486
(DataStructureConverter<RowData, Object>) lookupSideFetcherConverter,
487+
lookupSideGeneratedCalc,
460488
lookupSideGeneratedResultFuture,
461-
InternalSerializers.create(lookupTableSourceRowType),
489+
InternalSerializers.create(lookupSideType),
462490
leftJoinKeySelector,
463491
leftUpsertKeySelector,
464492
rightJoinKeySelector,

0 commit comments

Comments
 (0)