Skip to content

Commit 6cf7fdf

Browse files
committed
address comment
1 parent 3d28928 commit 6cf7fdf

File tree

3 files changed

+5
-2
lines changed

3 files changed

+5
-2
lines changed

docs/themes/book

Submodule book updated 215 files

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecDeltaJoin.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
142142

143143
@JsonProperty(FIELD_NAME_LEFT_UPSERT_KEY)
144144
@JsonInclude(JsonInclude.Include.NON_NULL)
145+
@Nullable
145146
private final int[] leftUpsertKeys;
146147

147148
// left (streaming) side join right (lookup) side
@@ -155,6 +156,7 @@ public class StreamExecDeltaJoin extends ExecNodeBase<RowData>
155156

156157
@JsonProperty(FIELD_NAME_RIGHT_UPSERT_KEY)
157158
@JsonInclude(JsonInclude.Include.NON_NULL)
159+
@Nullable
158160
private final int[] rightUpsertKeys;
159161

160162
// right (streaming) side join left (lookup) side

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/DeltaJoinUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,8 @@ private static boolean areJoinConditionsSupported(StreamPhysicalJoin join) {
252252
return false;
253253
}
254254

255-
// if this join output cdc records, the non-equiv condition must be applied on upsert key
255+
// if this join outputs cdc records and has non-equiv condition, the reference columns in
256+
// the non-equiv condition must come from the same set of upsert keys
256257
ChangelogMode changelogMode = getChangelogMode(join);
257258
if (changelogMode.containsOnly(RowKind.INSERT)) {
258259
return true;

0 commit comments

Comments
 (0)