|  | 
| 85 | 85 | import org.assertj.core.api.HamcrestCondition; | 
| 86 | 86 | import org.assertj.core.api.InstanceOfAssertFactories; | 
| 87 | 87 | import org.junit.jupiter.api.Test; | 
|  | 88 | +import org.junit.jupiter.params.ParameterizedTest; | 
|  | 89 | +import org.junit.jupiter.params.provider.Arguments; | 
|  | 90 | +import org.junit.jupiter.params.provider.MethodSource; | 
| 88 | 91 | 
 | 
| 89 | 92 | import javax.annotation.Nullable; | 
| 90 | 93 | 
 | 
|  | 
| 96 | 99 | import java.util.Optional; | 
| 97 | 100 | import java.util.Set; | 
| 98 | 101 | import java.util.TreeMap; | 
|  | 102 | +import java.util.function.Consumer; | 
| 99 | 103 | import java.util.stream.Collectors; | 
|  | 104 | +import java.util.stream.Stream; | 
| 100 | 105 | 
 | 
| 101 | 106 | import static org.apache.flink.table.api.Expressions.$; | 
| 102 | 107 | import static org.apache.flink.table.planner.utils.OperationMatchers.entry; | 
| @@ -1404,97 +1409,93 @@ void testFailedToAlterTableDropDistribution() throws Exception { | 
| 1404 | 1409 |         checkAlterNonExistTable("alter table %s nonexistent drop watermark"); | 
| 1405 | 1410 |     } | 
| 1406 | 1411 | 
 | 
| 1407 |  | -    @Test | 
| 1408 |  | -    void createMaterializedTableWithRefreshModeContinuous() throws Exception { | 
| 1409 |  | -        final String sql = | 
| 1410 |  | -                "CREATE MATERIALIZED TABLE users_shops (" | 
| 1411 |  | -                        + " PRIMARY KEY (user_id) not enforced)" | 
| 1412 |  | -                        + " WITH(\n" | 
| 1413 |  | -                        + "   'format' = 'debezium-json'\n" | 
| 1414 |  | -                        + " )\n" | 
| 1415 |  | -                        + " FRESHNESS = INTERVAL '30' SECOND\n" | 
| 1416 |  | -                        + " REFRESH_MODE = CONTINUOUS\n" | 
| 1417 |  | -                        + " AS SELECT 1 as shop_id, 2 as user_id "; | 
| 1418 |  | - | 
| 1419 |  | -        final String expectedSummaryString = | 
| 1420 |  | -                "CREATE MATERIALIZED TABLE: (materializedTable: " | 
| 1421 |  | -                        + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n" | 
| 1422 |  | -                        + "  `shop_id` INT NOT NULL,\n" | 
| 1423 |  | -                        + "  `user_id` INT NOT NULL,\n" | 
| 1424 |  | -                        + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" | 
| 1425 |  | -                        + "), comment='null', distribution=null, partitionKeys=[], " | 
| 1426 |  | -                        + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', " | 
| 1427 |  | -                        + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, " | 
| 1428 |  | -                        + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n" | 
| 1429 |  | -                        + "  `shop_id` INT NOT NULL,\n" | 
| 1430 |  | -                        + "  `user_id` INT NOT NULL,\n" | 
| 1431 |  | -                        + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" | 
| 1432 |  | -                        + ")}], identifier: [`builtin`.`default`.`users_shops`])"; | 
|  | 1412 | +    @ParameterizedTest(name = "[{index}] {0}") | 
|  | 1413 | +    @MethodSource("provideCreateMaterializedTableTestCases") | 
|  | 1414 | +    void createMaterializedTableWithVariousOptions( | 
|  | 1415 | +            String testName, | 
|  | 1416 | +            String sql, | 
|  | 1417 | +            String expectedSummaryString, | 
|  | 1418 | +            Consumer<CreateMaterializedTableOperation> additionalAssertions) { | 
| 1433 | 1419 | 
 | 
| 1434 | 1420 |         final Operation operation = parse(sql); | 
| 1435 | 1421 | 
 | 
| 1436 | 1422 |         assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); | 
| 1437 | 1423 |         assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString); | 
|  | 1424 | + | 
| 1438 | 1425 |         final CreateMaterializedTableOperation createMaterializedTableOperation = | 
| 1439 | 1426 |                 (CreateMaterializedTableOperation) operation; | 
| 1440 |  | -        assertThat( | 
| 1441 |  | -                        createMaterializedTableOperation | 
| 1442 |  | -                                .getCatalogMaterializedTable() | 
| 1443 |  | -                                .getDefinitionFreshness()) | 
| 1444 |  | -                .isEqualTo(IntervalFreshness.ofSecond("30")); | 
| 1445 |  | -        assertThat(createMaterializedTableOperation.getCatalogMaterializedTable().getRefreshMode()) | 
| 1446 |  | -                .isSameAs(RefreshMode.CONTINUOUS); | 
| 1447 | 1427 | 
 | 
| 1448 |  | -        prepareMaterializedTable("tb2", false, 1, null, "SELECT 1"); | 
| 1449 |  | -    } | 
| 1450 |  | - | 
| 1451 |  | -    @Test | 
| 1452 |  | -    void createMaterializedTableWithDistribution() throws Exception { | 
| 1453 |  | -        final String sql = | 
| 1454 |  | -                "CREATE MATERIALIZED TABLE users_shops (" | 
| 1455 |  | -                        + " PRIMARY KEY (user_id) not enforced)" | 
| 1456 |  | -                        + " DISTRIBUTED BY HASH (user_id) INTO 7 BUCKETS\n" | 
| 1457 |  | -                        + " WITH(\n" | 
| 1458 |  | -                        + "   'format' = 'debezium-json'\n" | 
| 1459 |  | -                        + " )\n" | 
| 1460 |  | -                        + " FRESHNESS = INTERVAL '30' SECOND\n" | 
| 1461 |  | -                        + " AS SELECT 1 as shop_id, 2 as user_id "; | 
| 1462 |  | - | 
| 1463 |  | -        final String expectedSummaryString = | 
| 1464 |  | -                "CREATE MATERIALIZED TABLE: (materializedTable: " | 
| 1465 |  | -                        + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n" | 
| 1466 |  | -                        + "  `shop_id` INT NOT NULL,\n" | 
| 1467 |  | -                        + "  `user_id` INT NOT NULL,\n" | 
| 1468 |  | -                        + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" | 
| 1469 |  | -                        + "), comment='null', distribution=DISTRIBUTED BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], " | 
| 1470 |  | -                        + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', " | 
| 1471 |  | -                        + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=AUTOMATIC, refreshMode=null, " | 
| 1472 |  | -                        + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n" | 
| 1473 |  | -                        + "  `shop_id` INT NOT NULL,\n" | 
| 1474 |  | -                        + "  `user_id` INT NOT NULL,\n" | 
| 1475 |  | -                        + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" | 
| 1476 |  | -                        + ")}], identifier: [`builtin`.`default`.`users_shops`])"; | 
| 1477 |  | - | 
| 1478 |  | -        final Operation operation = parse(sql); | 
| 1479 |  | - | 
| 1480 |  | -        assertThat(operation).isInstanceOf(CreateMaterializedTableOperation.class); | 
| 1481 |  | -        assertThat(operation.asSummaryString()).isEqualTo(expectedSummaryString); | 
| 1482 |  | -        assertThat( | 
| 1483 |  | -                        ((CreateMaterializedTableOperation) operation) | 
| 1484 |  | -                                .getCatalogMaterializedTable() | 
| 1485 |  | -                                .getDistribution() | 
| 1486 |  | -                                .get()) | 
| 1487 |  | -                .isEqualTo(TableDistribution.of(Kind.HASH, 7, List.of("user_id"))); | 
| 1488 |  | - | 
| 1489 |  | -        prepareMaterializedTable("tb2", false, 1, null, "SELECT 1"); | 
| 1490 |  | - | 
| 1491 |  | -        assertThatThrownBy( | 
| 1492 |  | -                        () -> | 
| 1493 |  | -                                parse( | 
| 1494 |  | -                                        "alter MATERIALIZED table cat1.db1.tb2 modify distribution into 3 buckets")) | 
| 1495 |  | -                .isInstanceOf(ValidationException.class) | 
| 1496 |  | -                .hasMessageContaining( | 
| 1497 |  | -                        "Materialized table `cat1`.`db1`.`tb2` does not have a distribution to modify."); | 
|  | 1428 | +        additionalAssertions.accept(createMaterializedTableOperation); | 
|  | 1429 | +    } | 
|  | 1430 | + | 
|  | 1431 | +    private static Stream<Arguments> provideCreateMaterializedTableTestCases() { | 
|  | 1432 | +        return Stream.of( | 
|  | 1433 | +                Arguments.of( | 
|  | 1434 | +                        "with refresh mode continuous", | 
|  | 1435 | +                        "CREATE MATERIALIZED TABLE users_shops (" | 
|  | 1436 | +                                + " PRIMARY KEY (user_id) not enforced)" | 
|  | 1437 | +                                + " WITH(\n" | 
|  | 1438 | +                                + "   'format' = 'debezium-json'\n" | 
|  | 1439 | +                                + " )\n" | 
|  | 1440 | +                                + " FRESHNESS = INTERVAL '30' SECOND\n" | 
|  | 1441 | +                                + " REFRESH_MODE = CONTINUOUS\n" | 
|  | 1442 | +                                + " AS SELECT 1 as shop_id, 2 as user_id ", | 
|  | 1443 | +                        "CREATE MATERIALIZED TABLE: (materializedTable: " | 
|  | 1444 | +                                + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n" | 
|  | 1445 | +                                + "  `shop_id` INT NOT NULL,\n" | 
|  | 1446 | +                                + "  `user_id` INT NOT NULL,\n" | 
|  | 1447 | +                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" | 
|  | 1448 | +                                + "), comment='null', distribution=null, partitionKeys=[], " | 
|  | 1449 | +                                + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', " | 
|  | 1450 | +                                + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=CONTINUOUS, refreshMode=CONTINUOUS, " | 
|  | 1451 | +                                + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n" | 
|  | 1452 | +                                + "  `shop_id` INT NOT NULL,\n" | 
|  | 1453 | +                                + "  `user_id` INT NOT NULL,\n" | 
|  | 1454 | +                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" | 
|  | 1455 | +                                + ")}], identifier: [`builtin`.`default`.`users_shops`])", | 
|  | 1456 | +                        (Consumer<CreateMaterializedTableOperation>) | 
|  | 1457 | +                                op -> { | 
|  | 1458 | +                                    assertThat( | 
|  | 1459 | +                                                    op.getCatalogMaterializedTable() | 
|  | 1460 | +                                                            .getDefinitionFreshness()) | 
|  | 1461 | +                                            .isEqualTo(IntervalFreshness.ofSecond("30")); | 
|  | 1462 | +                                    assertThat(op.getCatalogMaterializedTable().getRefreshMode()) | 
|  | 1463 | +                                            .isSameAs(RefreshMode.CONTINUOUS); | 
|  | 1464 | +                                }), | 
|  | 1465 | +                Arguments.of( | 
|  | 1466 | +                        "with distribution", | 
|  | 1467 | +                        "CREATE MATERIALIZED TABLE users_shops (" | 
|  | 1468 | +                                + " PRIMARY KEY (user_id) not enforced)" | 
|  | 1469 | +                                + " DISTRIBUTED BY HASH (user_id) INTO 7 BUCKETS\n" | 
|  | 1470 | +                                + " WITH(\n" | 
|  | 1471 | +                                + "   'format' = 'debezium-json'\n" | 
|  | 1472 | +                                + " )\n" | 
|  | 1473 | +                                + " FRESHNESS = INTERVAL '30' SECOND\n" | 
|  | 1474 | +                                + " AS SELECT 1 as shop_id, 2 as user_id ", | 
|  | 1475 | +                        "CREATE MATERIALIZED TABLE: (materializedTable: " | 
|  | 1476 | +                                + "[ResolvedCatalogMaterializedTable{origin=DefaultCatalogMaterializedTable{schema=(\n" | 
|  | 1477 | +                                + "  `shop_id` INT NOT NULL,\n" | 
|  | 1478 | +                                + "  `user_id` INT NOT NULL,\n" | 
|  | 1479 | +                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" | 
|  | 1480 | +                                + "), comment='null', distribution=DISTRIBUTED BY HASH(`user_id`) INTO 7 BUCKETS, partitionKeys=[], " | 
|  | 1481 | +                                + "options={format=debezium-json}, snapshot=null, definitionQuery='SELECT 1 AS `shop_id`, 2 AS `user_id`', " | 
|  | 1482 | +                                + "freshness=INTERVAL '30' SECOND, logicalRefreshMode=AUTOMATIC, refreshMode=null, " | 
|  | 1483 | +                                + "refreshStatus=INITIALIZING, refreshHandlerDescription='null', serializedRefreshHandler=null}, resolvedSchema=(\n" | 
|  | 1484 | +                                + "  `shop_id` INT NOT NULL,\n" | 
|  | 1485 | +                                + "  `user_id` INT NOT NULL,\n" | 
|  | 1486 | +                                + "  CONSTRAINT `PK_user_id` PRIMARY KEY (`user_id`) NOT ENFORCED\n" | 
|  | 1487 | +                                + ")}], identifier: [`builtin`.`default`.`users_shops`])", | 
|  | 1488 | +                        (Consumer<CreateMaterializedTableOperation>) | 
|  | 1489 | +                                op -> | 
|  | 1490 | +                                        assertThat( | 
|  | 1491 | +                                                        op.getCatalogMaterializedTable() | 
|  | 1492 | +                                                                .getDistribution() | 
|  | 1493 | +                                                                .get()) | 
|  | 1494 | +                                                .isEqualTo( | 
|  | 1495 | +                                                        TableDistribution.of( | 
|  | 1496 | +                                                                Kind.HASH, | 
|  | 1497 | +                                                                7, | 
|  | 1498 | +                                                                List.of("user_id"))))); | 
| 1498 | 1499 |     } | 
| 1499 | 1500 | 
 | 
| 1500 | 1501 |     @Test | 
|  | 
0 commit comments