Skip to content

Commit f1d0ab6

Browse files
authored
[FLINK-37541][table-planner] Deprecate getTargetColumns in DynamicTableSink.Context (#26381)
1 parent cc55c56 commit f1d0ab6

File tree

10 files changed

+67
-26
lines changed

10 files changed

+67
-26
lines changed

flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,10 @@ interface Context {
190190
* </ul>
191191
*
192192
* <p>Note: will always return empty for the delete statement because it has no column list.
193+
*
194+
* @deprecated use {@link SupportsTargetColumnWriting} instead.
193195
*/
196+
@Deprecated(since = "2.2")
194197
Optional<int[][]> getTargetColumns();
195198
}
196199

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.table.module.Module;
2626
import org.apache.flink.table.planner.calcite.FlinkContext;
2727
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
28+
import org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec;
2829

2930
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
3031
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
@@ -52,6 +53,7 @@ public class DynamicTableSinkSpec extends DynamicTableSpecBase {
5253
private final ContextResolvedTable contextResolvedTable;
5354
private final @Nullable List<SinkAbilitySpec> sinkAbilities;
5455

56+
@Deprecated(since = "2.2")
5557
private final @Nullable int[][] targetColumns;
5658

5759
private DynamicTableSink tableSink;
@@ -99,9 +101,13 @@ public DynamicTableSink getTableSink(FlinkContext context) {
99101
return tableSink;
100102
}
101103

104+
/**
105+
* @deprecated use {@link TargetColumnWritingSpec} instead.
106+
*/
102107
@JsonGetter(FIELD_NAME_TARGET_COLUMNS)
103108
@JsonInclude(JsonInclude.Include.NON_EMPTY)
104109
@Nullable
110+
@Deprecated(since = "2.2")
105111
public int[][] getTargetColumns() {
106112
return targetColumns;
107113
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
2222
import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
23-
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
2423
import org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
2524
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
2625
import org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalUnion;
@@ -217,18 +216,12 @@ private class ReusableSinkGroup {
217216
this.originalSinks.add(sink);
218217
this.inputTraitSet = sink.getInput().getTraitSet();
219218
this.digest = getDigest(sink);
220-
this.sinkAbilitySpecs =
221-
isStreamingMode
222-
? ((StreamPhysicalSink) sink).abilitySpecs()
223-
: ((BatchPhysicalSink) sink).abilitySpecs();
219+
this.sinkAbilitySpecs = sink.abilitySpecs();
224220
}
225221

226222
public boolean canBeReused(Sink sinkNode) {
227223
String currentSinkDigest = getDigest(sinkNode);
228-
SinkAbilitySpec[] currentSinkSpecs =
229-
isStreamingMode
230-
? ((StreamPhysicalSink) sinkNode).abilitySpecs()
231-
: ((BatchPhysicalSink) sinkNode).abilitySpecs();
224+
SinkAbilitySpec[] currentSinkSpecs = sinkNode.abilitySpecs();
232225
RelTraitSet currentInputTraitSet = sinkNode.getInput().getTraitSet();
233226

234227
// Only table sink with the same digest, specs and input trait set can be reused

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,16 @@ final class LogicalSink(
4242
tableSink: DynamicTableSink,
4343
targetColumns: Array[Array[Int]],
4444
val staticPartitions: Map[String, String],
45-
val abilitySpecs: Array[SinkAbilitySpec])
46-
extends Sink(cluster, traitSet, input, hints, targetColumns, contextResolvedTable, tableSink) {
45+
abilitySpecs: Array[SinkAbilitySpec])
46+
extends Sink(
47+
cluster,
48+
traitSet,
49+
input,
50+
hints,
51+
targetColumns,
52+
contextResolvedTable,
53+
tableSink,
54+
abilitySpecs) {
4755

4856
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
4957
new LogicalSink(

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.flink.table.planner.plan.nodes.calcite
1919

2020
import org.apache.flink.table.catalog.ContextResolvedTable
2121
import org.apache.flink.table.connector.sink.DynamicTableSink
22+
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec
23+
import org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec
2224
import org.apache.flink.table.planner.plan.utils.RelExplainUtil
2325

2426
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -42,20 +44,25 @@ import scala.collection.JavaConversions._
4244
* @param hints
4345
* the hints
4446
* @param targetColumns
45-
* the specified target columns.
47+
* the specified target columns. @Deprecated(since = "2.2"), use [[TargetColumnWritingSpec]]
48+
* instead.
4649
* @param contextResolvedTable
4750
* the table definition.
4851
* @param tableSink
4952
* the [[DynamicTableSink]] for which to write into
53+
* @param abilitySpecs
54+
* the [[SinkAbilitySpec]]s of this sink
5055
*/
5156
abstract class Sink(
5257
cluster: RelOptCluster,
5358
traitSet: RelTraitSet,
5459
input: RelNode,
5560
val hints: util.List[RelHint],
61+
@deprecated(since = "2.2")
5662
val targetColumns: Array[Array[Int]],
5763
val contextResolvedTable: ContextResolvedTable,
58-
val tableSink: DynamicTableSink)
64+
val tableSink: DynamicTableSink,
65+
val abilitySpecs: Array[SinkAbilitySpec])
5966
extends SingleRel(cluster, traitSet, input) {
6067

6168
override def deriveRowType(): RelDataType = {

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,16 @@ class FlinkLogicalSink(
4646
tableSink: DynamicTableSink,
4747
targetColumns: Array[Array[Int]],
4848
val staticPartitions: Map[String, String],
49-
val abilitySpecs: Array[SinkAbilitySpec])
50-
extends Sink(cluster, traitSet, input, hints, targetColumns, contextResolvedTable, tableSink)
49+
abilitySpecs: Array[SinkAbilitySpec])
50+
extends Sink(
51+
cluster,
52+
traitSet,
53+
input,
54+
hints,
55+
targetColumns,
56+
contextResolvedTable,
57+
tableSink,
58+
abilitySpecs)
5159
with FlinkLogicalRel {
5260

5361
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,16 @@ class BatchPhysicalSink(
4242
contextResolvedTable: ContextResolvedTable,
4343
tableSink: DynamicTableSink,
4444
targetColumns: Array[Array[Int]],
45-
val abilitySpecs: Array[SinkAbilitySpec])
46-
extends Sink(cluster, traitSet, inputRel, hints, targetColumns, contextResolvedTable, tableSink)
45+
abilitySpecs: Array[SinkAbilitySpec])
46+
extends Sink(
47+
cluster,
48+
traitSet,
49+
inputRel,
50+
hints,
51+
targetColumns,
52+
contextResolvedTable,
53+
tableSink,
54+
abilitySpecs)
4755
with BatchPhysicalRel {
4856

4957
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,17 @@ class StreamPhysicalSink(
4646
contextResolvedTable: ContextResolvedTable,
4747
tableSink: DynamicTableSink,
4848
targetColumns: Array[Array[Int]],
49-
val abilitySpecs: Array[SinkAbilitySpec],
49+
abilitySpecs: Array[SinkAbilitySpec],
5050
val upsertMaterialize: Boolean = false)
51-
extends Sink(cluster, traitSet, inputRel, hints, targetColumns, contextResolvedTable, tableSink)
51+
extends Sink(
52+
cluster,
53+
traitSet,
54+
inputRel,
55+
hints,
56+
targetColumns,
57+
contextResolvedTable,
58+
tableSink,
59+
abilitySpecs)
5260
with StreamPhysicalRel {
5361

5462
override def requireWatermark: Boolean = false

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2371,16 +2371,15 @@ public Optional<Integer> getParallelism() {
23712371
} else {
23722372
// we don't support OutputFormat for updating query in the TestValues connector
23732373
assertThat(runtimeSink.equals("SinkFunction")).isTrue();
2374-
// check the contract of the context.getTargetColumns method returns the expected
2375-
// empty Option or non-empty Option with a non-empty array
2376-
assertThat(
2377-
!context.getTargetColumns().isPresent()
2378-
|| context.getTargetColumns().get().length > 0)
2379-
.isTrue();
2374+
// check the contract that targetColumns should be null for empty array and should
2375+
// only be applied with a non-empty array
2376+
assertThat(this.targetColumns == null || this.targetColumns.length > 0).isTrue();
23802377
SinkFunction<RowData> sinkFunction;
23812378
if (primaryKeyIndices.length > 0) {
23822379
// TODO FLINK-31301 currently partial-insert composite columns are not supported
2383-
int[][] targetColumns = context.getTargetColumns().orElse(new int[0][]);
2380+
int[][] targetColumns =
2381+
this.targetColumns != null ? this.targetColumns : new int[0][];
2382+
23842383
checkArgument(
23852384
Arrays.stream(targetColumns).allMatch(subArr -> subArr.length <= 1),
23862385
"partial-insert composite columns are not supported yet!");

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ public DynamicTableSink.DataStructureConverter createDataStructureConverter(
7474
}
7575

7676
@Override
77+
@Deprecated(since = "2.2")
7778
public Optional<int[][]> getTargetColumns() {
7879
return Optional.ofNullable(targetColumns);
7980
}

0 commit comments

Comments
 (0)