Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@

import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -42,6 +44,7 @@ public final class SinkTestStep extends TableTestStep {
public final @Nullable List<Row> expectedMaterializedRows;
public final @Nullable List<String> expectedMaterializedStrings;
public final boolean testChangelogData;
public final @Nullable int[] deduplicatedFieldIndices;

SinkTestStep(
String name,
Expand All @@ -55,17 +58,16 @@ public final class SinkTestStep extends TableTestStep {
@Nullable List<String> expectedAfterRestoreStrings,
@Nullable List<Row> expectedMaterializedRows,
@Nullable List<String> expectedMaterializedStrings,
boolean testChangelogData) {
super(name, schemaComponents, distribution, partitionKeys, options);
boolean hasRowsSet =
expectedBeforeRestore != null
|| expectedAfterRestore != null
|| expectedMaterializedRows != null;
boolean hasStringsSet =
expectedBeforeRestoreStrings != null
|| expectedAfterRestoreStrings != null
|| expectedMaterializedStrings != null;
if (hasRowsSet && hasStringsSet) {
boolean testChangelogData,
@Nullable int[] deduplicatedFieldIndices) {
super(
name,
schemaComponents,
distribution,
partitionKeys,
options,
Collections.emptyList());
if (hasRowsSet() && hasStringsSet()) {
throw new IllegalArgumentException(
"You can not mix Row/String representations in restore data.");
}
Expand All @@ -76,6 +78,12 @@ public final class SinkTestStep extends TableTestStep {
this.expectedMaterializedRows = expectedMaterializedRows;
this.expectedMaterializedStrings = expectedMaterializedStrings;
this.testChangelogData = testChangelogData;
this.deduplicatedFieldIndices = deduplicatedFieldIndices;

if (deduplicatedFieldIndices != null && !hasRowsSet()) {
throw new IllegalArgumentException(
"DeduplicatedFieldIndices can only be used for Row representations in restore data.");
}
}

/** Builder for creating a {@link SinkTestStep}. */
Expand Down Expand Up @@ -108,16 +116,40 @@ public List<String> getExpectedAfterRestoreAsStrings() {
}

public List<String> getExpectedAsStrings() {
final List<String> data = new ArrayList<>(getExpectedBeforeRestoreAsStrings());
data.addAll(getExpectedAfterRestoreAsStrings());
return data;
if (hasStringsSet() || deduplicatedFieldIndices == null) {
final List<String> data = new ArrayList<>(getExpectedBeforeRestoreAsStrings());
data.addAll(getExpectedAfterRestoreAsStrings());
return data;
}
Preconditions.checkState(hasRowsSet());
final List<Row> data = new ArrayList<>();
if (expectedBeforeRestore != null) {
data.addAll(expectedBeforeRestore);
}
if (expectedAfterRestore != null) {
data.addAll(expectedAfterRestore);
}

Map<List<Object>, Row> deduplicatedMap = new HashMap<>();
for (Row row : data) {
List<Object> key = new ArrayList<>(deduplicatedFieldIndices.length);
for (int i = 0; i < deduplicatedFieldIndices.length; i++) {
key.add(row.getField(deduplicatedFieldIndices[i]));
}
deduplicatedMap.put(key, row);
}
return deduplicatedMap.values().stream().map(Row::toString).collect(Collectors.toList());
}

public List<String> getExpectedMaterializedResultsAsStrings() {
if (expectedMaterializedStrings != null) {
return expectedMaterializedStrings;
}
if (expectedMaterializedRows != null) {
if (deduplicatedFieldIndices != null) {
throw new UnsupportedOperationException(
"Unsupported to deduplicate data for materialized rows");
}
return expectedMaterializedRows.stream()
.map(Row::toString)
.collect(Collectors.toList());
Expand All @@ -138,6 +170,18 @@ public boolean shouldTestChangelogData() {
return testChangelogData;
}

private boolean hasRowsSet() {
return expectedBeforeRestore != null
|| expectedAfterRestore != null
|| expectedMaterializedRows != null;
}

private boolean hasStringsSet() {
return expectedBeforeRestoreStrings != null
|| expectedAfterRestoreStrings != null
|| expectedMaterializedStrings != null;
}

/** Builder pattern for {@link SinkTestStep}. */
public static final class Builder extends AbstractBuilder<Builder> {

Expand All @@ -151,6 +195,8 @@ public static final class Builder extends AbstractBuilder<Builder> {

private boolean testChangelogData = true;

private @Nullable int[] deduplicatedFieldIndices;

private Builder(String name) {
super(name);
}
Expand Down Expand Up @@ -203,6 +249,14 @@ public Builder testMaterializedData() {
return this;
}

public Builder deduplicatedFieldIndices(int[] deduplicatedFieldIndices) {
// TODO FLINK-38518 use pk to deduplicate data rather than specific fields.
// This task requires refactoring the current `AbstractBuilder` to separate the
// declaration of the primary key from the `List<String> schemaComponents`.
this.deduplicatedFieldIndices = deduplicatedFieldIndices;
return this;
}

public SinkTestStep build() {
return new SinkTestStep(
name,
Expand All @@ -216,7 +270,8 @@ public SinkTestStep build() {
expectedAfterRestoreStrings,
expectedMaterializedBeforeRows,
expectedMaterializedBeforeStrings,
testChangelogData);
testChangelogData,
deduplicatedFieldIndices);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,22 @@ public final class SourceTestStep extends TableTestStep {
public final List<Row> dataBeforeRestore;
public final List<Row> dataAfterRestore;

public final boolean treatDataBeforeRestoreAsFullStageData;

SourceTestStep(
String name,
List<String> schemaComponents,
@Nullable TableDistribution distribution,
List<String> partitionKeys,
Map<String, String> options,
List<List<String>> indexes,
List<Row> dataBeforeRestore,
List<Row> dataAfterRestore) {
super(name, schemaComponents, distribution, partitionKeys, options);
List<Row> dataAfterRestore,
boolean treatDataBeforeRestoreAsFullStageData) {
super(name, schemaComponents, distribution, partitionKeys, options, indexes);
this.dataBeforeRestore = dataBeforeRestore;
this.dataAfterRestore = dataAfterRestore;
this.treatDataBeforeRestoreAsFullStageData = treatDataBeforeRestoreAsFullStageData;
}

/** Builder for creating a {@link SourceTestStep}. */
Expand All @@ -66,6 +71,8 @@ public static final class Builder extends AbstractBuilder<Builder> {

private final List<Row> dataBeforeRestore = new ArrayList<>();
private final List<Row> dataAfterRestore = new ArrayList<>();
private final List<List<String>> indexes = new ArrayList<>();
private boolean treatDataBeforeRestoreAsFullStageData = false;

private Builder(String name) {
super(name);
Expand All @@ -85,15 +92,27 @@ public Builder producedAfterRestore(Row... data) {
return this;
}

public Builder addIndex(String... index) {
this.indexes.add(Arrays.asList(index));
return this;
}

public Builder treatDataBeforeRestoreAsFullStageData() {
this.treatDataBeforeRestoreAsFullStageData = true;
return this;
}

public SourceTestStep build() {
return new SourceTestStep(
name,
schemaComponents,
distribution,
partitionKeys,
options,
indexes,
dataBeforeRestore,
dataAfterRestore);
dataAfterRestore,
treatDataBeforeRestoreAsFullStageData);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,16 @@ public Builder setupTableSource(SourceTestStep sourceTestStep) {
return this;
}

/**
* Setup steps for each table source.
*
* <p>Use {@link SourceTestStep.Builder} to construct this step.
*/
public Builder setupTableSources(List<SourceTestStep> sourceTestSteps) {
setupSteps.addAll(sourceTestSteps);
return this;
}

/**
* Setup step for a table sink.
*
Expand All @@ -327,6 +337,16 @@ public Builder setupTableSink(SinkTestStep sinkTestStep) {
return this;
}

/**
* Setup steps for each table sink.
*
* <p>Use {@link SinkTestStep.Builder} to construct this step.
*/
public Builder setupTableSinks(List<SinkTestStep> sinkTestSteps) {
setupSteps.addAll(sinkTestSteps);
return this;
}

/**
* Setup step for a model.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,18 @@

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -58,25 +65,28 @@ public abstract class TableTestStep implements TestStep {
public final @Nullable TableDistribution distribution;
public final List<String> partitionKeys;
public final Map<String, String> options;
public final List<List<String>> indexes;

TableTestStep(
String name,
List<String> schemaComponents,
@Nullable TableDistribution distribution,
List<String> partitionKeys,
Map<String, String> options) {
Map<String, String> options,
List<List<String>> indexes) {
this.name = name;
this.schemaComponents = schemaComponents;
this.distribution = distribution;
this.partitionKeys = partitionKeys;
this.options = options;
this.indexes = indexes;
}

public TableResult apply(TableEnvironment env) {
return apply(env, Collections.emptyMap());
public void apply(TableEnvironment env) {
apply(env, Collections.emptyMap());
}

public TableResult apply(TableEnvironment env, Map<String, String> extraOptions) {
public void apply(TableEnvironment env, Map<String, String> extraOptions) {
final Map<String, String> allOptions = new HashMap<>(options);
allOptions.putAll(extraOptions);

Expand All @@ -97,7 +107,41 @@ public TableResult apply(TableEnvironment env, Map<String, String> extraOptions)
.map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue()))
.collect(Collectors.joining(",\n")));

return env.executeSql(createTable);
env.executeSql(createTable);
if (indexes.isEmpty()) {
return;
}

Optional<Catalog> currentCatalogOp = env.getCatalog(env.getCurrentCatalog());
Preconditions.checkState(currentCatalogOp.isPresent());
Catalog catalog = currentCatalogOp.get();

String currentDatabaseName = env.getCurrentDatabase();
ObjectPath tablePath = new ObjectPath(currentDatabaseName, name);
CatalogTable oldTable;
try {
oldTable = (CatalogTable) catalog.getTable(tablePath);
catalog.dropTable(tablePath, false);
} catch (TableNotExistException e) {
throw new IllegalStateException(e);
}
Schema schema = oldTable.getUnresolvedSchema();
Schema.Builder schemaBuilder = Schema.newBuilder().fromSchema(schema);
indexes.forEach(schemaBuilder::index);
CatalogTable newTable =
CatalogTable.newBuilder()
.schema(schemaBuilder.build())
.comment(oldTable.getComment())
.partitionKeys(oldTable.getPartitionKeys())
.options(oldTable.getOptions())
.snapshot(oldTable.getSnapshot().orElse(null))
.distribution(oldTable.getDistribution().orElse(null))
.build();
try {
catalog.createTable(tablePath, newTable, false);
} catch (TableAlreadyExistException | DatabaseNotExistException e) {
throw new IllegalStateException(e);
}
}

/** Builder pattern for {@link SourceTestStep} and {@link SinkTestStep}. */
Expand Down
Loading