Skip to content

Commit f549a70

Browse files
committed
[FLINK-38510][table-planner] Remove targetColumns from the digest generation in SinkReuser
1 parent f1d0ab6 commit f549a70

File tree

6 files changed

+93
-28
lines changed

6 files changed

+93
-28
lines changed

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -177,16 +177,6 @@ private String getDigest(Sink sink) {
177177
List<String> digest = new ArrayList<>();
178178
digest.add(sink.contextResolvedTable().getIdentifier().asSummaryString());
179179

180-
int[][] targetColumns = sink.targetColumns();
181-
if (targetColumns != null && targetColumns.length > 0) {
182-
digest.add(
183-
"targetColumns=["
184-
+ Arrays.stream(targetColumns)
185-
.map(Arrays::toString)
186-
.collect(Collectors.joining(","))
187-
+ "]");
188-
}
189-
190180
String fieldTypes =
191181
sink.getRowType().getFieldList().stream()
192182
.map(f -> f.getType().toString())

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ abstract class Sink(
8080
.getOrElse(Array.empty[Array[Int]])
8181
.map(_.mkString("[", ",", "]"))
8282
.mkString(","),
83-
targetColumns != null
83+
// only print target columns when the sink supports TargetColumnWriting
84+
targetColumns != null && abilitySpecs.exists(
85+
spec => spec.isInstanceOf[TargetColumnWritingSpec])
8486
)
8587
.item("fields", getRowType.getFieldNames.mkString(", "))
8688
.itemIf("hints", RelExplainUtil.hintsToString(hints), !hints.isEmpty)

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
4444
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
4545
import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate;
46+
import org.apache.flink.table.connector.sink.abilities.SupportsTargetColumnWriting;
4647
import org.apache.flink.table.connector.sink.abilities.SupportsTruncate;
4748
import org.apache.flink.table.connector.source.DynamicTableSource;
4849
import org.apache.flink.table.connector.source.ScanTableSource;
@@ -323,7 +324,7 @@ private static class TestScanContext implements RowLevelModificationScanContext
323324

324325
/** A sink that supports row-level update. */
325326
private static class SupportsRowLevelUpdateSink
326-
implements DynamicTableSink, SupportsRowLevelUpdate {
327+
implements DynamicTableSink, SupportsRowLevelUpdate, SupportsTargetColumnWriting {
327328

328329
protected final ObjectIdentifier tableIdentifier;
329330
protected final ResolvedCatalogTable resolvedCatalogTable;
@@ -447,6 +448,12 @@ public RowLevelUpdateMode getRowLevelUpdateMode() {
447448
}
448449
};
449450
}
451+
452+
@Override
453+
public boolean applyTargetColumns(int[][] targetColumns) {
454+
// Implement SupportsTargetColumnWriting for the compatibility of existing test cases
455+
return true;
456+
}
450457
}
451458

452459
/** A sink that supports row-level delete/update. */

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/common/SinkReuseTestBase.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.flink.table.api.StatementSet;
2222
import org.apache.flink.table.api.TableConfig;
23-
import org.apache.flink.table.api.config.OptimizerConfigOptions;
2423
import org.apache.flink.table.planner.plan.reuse.SinkReuser;
2524
import org.apache.flink.table.planner.utils.TableTestBase;
2625
import org.apache.flink.table.planner.utils.TableTestUtil;
@@ -35,8 +34,6 @@ public abstract class SinkReuseTestBase extends TableTestBase {
3534
@BeforeEach
3635
protected void setup() {
3736
TableConfig tableConfig = TableConfig.getDefault();
38-
tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true);
39-
tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_REUSE_SINK_ENABLED, true);
4037
util = getTableTestUtil(tableConfig);
4138

4239
util.tableEnv()
@@ -153,14 +150,25 @@ public void testSinkReuseFromSameSource() {
153150
}
154151

155152
@Test
156-
public void testSinkReuseWithPartialColumns() {
153+
public void testSinkReuseWithPartialColumnsNotSupportsTargetColumnWriting() {
157154
StatementSet statementSet = util.tableEnv().createStatementSet();
155+
// sink1 has not implemented the SupportsTargetColumnWriting sink ability
158156
statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM source1)");
159157
statementSet.addInsertSql("INSERT INTO sink1(`y`) (SELECT y FROM source1)");
160158
statementSet.addInsertSql("INSERT INTO sink1(`x`) (SELECT x FROM source3)");
161159
util.verifyExecPlan(statementSet);
162160
}
163161

162+
@Test
163+
public void testSinkReuseWithPartialColumnsAndSupportsTargetColumnWriting() {
164+
StatementSet statementSet = util.tableEnv().createStatementSet();
165+
// sink2 has implemented the SupportsTargetColumnWriting sink ability
166+
statementSet.addInsertSql("INSERT INTO sink2(`x`) (SELECT x FROM source1)");
167+
statementSet.addInsertSql("INSERT INTO sink2(`y`) (SELECT y FROM source1)");
168+
statementSet.addInsertSql("INSERT INTO sink2(`x`) (SELECT x FROM source3)");
169+
util.verifyExecPlan(statementSet);
170+
}
171+
164172
@Test
165173
public void testSinkReuseWithOverwrite() {
166174
StatementSet statementSet = util.tableEnv().createStatementSet();

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/BatchSinkReuseTest.xml

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,18 +148,47 @@ Sink(table=[default_catalog.default_database.sink1], fields=[x, y], hints=[[[OPT
148148
]]>
149149
</Resource>
150150
</TestCase>
151-
<TestCase name="testSinkReuseWithPartialColumns">
151+
<TestCase name="testSinkReuseWithPartialColumnsNotSupportsTargetColumnWriting">
152152
<Resource name="ast">
153153
<![CDATA[
154-
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
154+
LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, EXPR$1])
155155
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
156156
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
157157
158-
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[1]], fields=[EXPR$0, y])
158+
LogicalSink(table=[default_catalog.default_database.sink1], fields=[EXPR$0, y])
159159
+- LogicalProject(EXPR$0=[null:BIGINT], y=[$1])
160160
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
161161
162-
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
162+
LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, EXPR$1])
163+
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
164+
+- LogicalTableScan(table=[[default_catalog, default_database, source3]])
165+
]]>
166+
</Resource>
167+
<Resource name="optimized exec plan">
168+
<![CDATA[
169+
Sink(table=[default_catalog.default_database.sink1], fields=[x, EXPR$1])
170+
+- Union(all=[true], union=[x, EXPR$1])
171+
:- Calc(select=[x, null:BIGINT AS EXPR$1])
172+
: +- TableSourceScan(table=[[default_catalog, default_database, source1, project=[x, y], metadata=[]]], fields=[x, y])(reuse_id=[1])
173+
:- Calc(select=[null:BIGINT AS EXPR$0, y])
174+
: +- Reused(reference_id=[1])
175+
+- Calc(select=[x, null:BIGINT AS EXPR$1])
176+
+- TableSourceScan(table=[[default_catalog, default_database, source3, project=[x], metadata=[]]], fields=[x])
177+
]]>
178+
</Resource>
179+
</TestCase>
180+
<TestCase name="testSinkReuseWithPartialColumnsAndSupportsTargetColumnWriting">
181+
<Resource name="ast">
182+
<![CDATA[
183+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
184+
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
185+
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
186+
187+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[1]], fields=[EXPR$0, y])
188+
+- LogicalProject(EXPR$0=[null:BIGINT], y=[$1])
189+
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
190+
191+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
163192
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
164193
+- LogicalTableScan(table=[[default_catalog, default_database, source3]])
165194
]]>
@@ -168,14 +197,14 @@ LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]],
168197
<![CDATA[
169198
TableSourceScan(table=[[default_catalog, default_database, source1, project=[x, y], metadata=[]]], fields=[x, y])(reuse_id=[1])
170199
171-
Sink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
200+
Sink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
172201
+- Union(all=[true], union=[x, EXPR$1])
173202
:- Calc(select=[x, null:BIGINT AS EXPR$1])
174203
: +- Reused(reference_id=[1])
175204
+- Calc(select=[x, null:BIGINT AS EXPR$1])
176205
+- TableSourceScan(table=[[default_catalog, default_database, source3, project=[x], metadata=[]]], fields=[x])
177206
178-
Sink(table=[default_catalog.default_database.sink1], targetColumns=[[1]], fields=[EXPR$0, y])
207+
Sink(table=[default_catalog.default_database.sink2], targetColumns=[[1]], fields=[EXPR$0, y])
179208
+- Calc(select=[null:BIGINT AS EXPR$0, y])
180209
+- Reused(reference_id=[1])
181210
]]>

flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/StreamSinkReuseTest.xml

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,18 +148,47 @@ Sink(table=[default_catalog.default_database.sink1], fields=[x, y])
148148
]]>
149149
</Resource>
150150
</TestCase>
151-
<TestCase name="testSinkReuseWithPartialColumns">
151+
<TestCase name="testSinkReuseWithPartialColumnsNotSupportsTargetColumnWriting">
152152
<Resource name="ast">
153153
<![CDATA[
154-
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
154+
LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, EXPR$1])
155155
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
156156
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
157157
158-
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[1]], fields=[EXPR$0, y])
158+
LogicalSink(table=[default_catalog.default_database.sink1], fields=[EXPR$0, y])
159159
+- LogicalProject(EXPR$0=[null:BIGINT], y=[$1])
160160
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
161161
162-
LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
162+
LogicalSink(table=[default_catalog.default_database.sink1], fields=[x, EXPR$1])
163+
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
164+
+- LogicalTableScan(table=[[default_catalog, default_database, source3]])
165+
]]>
166+
</Resource>
167+
<Resource name="optimized exec plan">
168+
<![CDATA[
169+
Sink(table=[default_catalog.default_database.sink1], fields=[x, EXPR$1])
170+
+- Union(all=[true], union=[x, EXPR$1])
171+
:- Calc(select=[x, null:BIGINT AS EXPR$1])
172+
: +- TableSourceScan(table=[[default_catalog, default_database, source1, project=[x, y], metadata=[]]], fields=[x, y])(reuse_id=[1])
173+
:- Calc(select=[null:BIGINT AS EXPR$0, y])
174+
: +- Reused(reference_id=[1])
175+
+- Calc(select=[x, null:BIGINT AS EXPR$1])
176+
+- TableSourceScan(table=[[default_catalog, default_database, source3, project=[x], metadata=[]]], fields=[x])
177+
]]>
178+
</Resource>
179+
</TestCase>
180+
<TestCase name="testSinkReuseWithPartialColumnsAndSupportsTargetColumnWriting">
181+
<Resource name="ast">
182+
<![CDATA[
183+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
184+
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
185+
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
186+
187+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[1]], fields=[EXPR$0, y])
188+
+- LogicalProject(EXPR$0=[null:BIGINT], y=[$1])
189+
+- LogicalTableScan(table=[[default_catalog, default_database, source1]])
190+
191+
LogicalSink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
163192
+- LogicalProject(x=[$0], EXPR$1=[null:BIGINT])
164193
+- LogicalTableScan(table=[[default_catalog, default_database, source3]])
165194
]]>
@@ -168,14 +197,14 @@ LogicalSink(table=[default_catalog.default_database.sink1], targetColumns=[[0]],
168197
<![CDATA[
169198
TableSourceScan(table=[[default_catalog, default_database, source1, project=[x, y], metadata=[]]], fields=[x, y])(reuse_id=[1])
170199
171-
Sink(table=[default_catalog.default_database.sink1], targetColumns=[[0]], fields=[x, EXPR$1])
200+
Sink(table=[default_catalog.default_database.sink2], targetColumns=[[0]], fields=[x, EXPR$1])
172201
+- Union(all=[true], union=[x, EXPR$1])
173202
:- Calc(select=[x, null:BIGINT AS EXPR$1])
174203
: +- Reused(reference_id=[1])
175204
+- Calc(select=[x, null:BIGINT AS EXPR$1])
176205
+- TableSourceScan(table=[[default_catalog, default_database, source3, project=[x], metadata=[]]], fields=[x])
177206
178-
Sink(table=[default_catalog.default_database.sink1], targetColumns=[[1]], fields=[EXPR$0, y])
207+
Sink(table=[default_catalog.default_database.sink2], targetColumns=[[1]], fields=[EXPR$0, y])
179208
+- Calc(select=[null:BIGINT AS EXPR$0, y])
180209
+- Reused(reference_id=[1])
181210
]]>

0 commit comments

Comments
 (0)