diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java index 003308e9a6cbd..5ad625971d5d7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeAllWindowCheckpointingITCase.java @@ -47,7 +47,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * This verifies that checkpointing works correctly with event time windows. @@ -79,320 +78,300 @@ private static Configuration getConfiguration() { // ------------------------------------------------------------------------ @Test - public void testTumblingTimeWindow() { + public void testTumblingTimeWindow() throws Exception { final int numElementsPerKey = 3000; final int windowSize = 100; final int numKeys = 1; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - - env.addSource( - new FailingSource( - new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator( - numKeys, windowSize), - numElementsPerKey)) - .rebalance() - .windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize))) - .apply( - new RichAllWindowFunction< - Tuple2, - Tuple4, - TimeWindow>() { - - private boolean open = false; - - @Override - public void open(OpenContext openContext) { - assertEquals( - 1, - getRuntimeContext() - .getTaskInfo() - .getNumberOfParallelSubtasks()); - open = true; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + + env.addSource( + new FailingSource( + new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator( + numKeys, windowSize), + numElementsPerKey)) + .rebalance() + .windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize))) + .apply( + new RichAllWindowFunction< + Tuple2, + Tuple4, + TimeWindow>() { + + private boolean open = false; + + @Override + public void open(OpenContext openContext) { + assertEquals( + 1, + getRuntimeContext() + .getTaskInfo() + .getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + TimeWindow window, + Iterable> values, + Collector> out) { + + // validate that the function has been opened properly + assertTrue(open); + + int sum = 0; + long key = -1; + + for (Tuple2 value : values) { + sum += value.f1.value; + key = value.f0; } - - @Override - public void apply( - TimeWindow window, - Iterable> values, - Collector> out) { - - // validate that the function has been opened properly - assertTrue(open); - - int sum = 0; - long key = -1; - - for (Tuple2 value : values) { - sum += value.f1.value; - key = value.f0; - } - out.collect( - new Tuple4<>( - key, - window.getStart(), - window.getEnd(), - new IntType(sum))); - } - }) - .addSink( - new ValidatingSink<>( - new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun( - numElementsPerKey), - new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun( - numKeys, numElementsPerKey, windowSize))) - .setParallelism(1); - - env.execute("Tumbling Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + out.collect( + new Tuple4<>( + key, + window.getStart(), + window.getEnd(), + new IntType(sum))); + } + }) + .addSink( + new ValidatingSink<>( + new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun( + numElementsPerKey), + new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun( + numKeys, numElementsPerKey, windowSize))) + .setParallelism(1); + + env.execute("Tumbling Window Test"); } @Test - public void testSlidingTimeWindow() { + public void testSlidingTimeWindow() throws Exception { final int numElementsPerKey = 3000; final int windowSize = 1000; final int windowSlide = 100; final int numKeys = 1; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - - env.addSource( - new FailingSource( - new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator( - numKeys, windowSlide), - numElementsPerKey)) - .rebalance() - .windowAll( - SlidingEventTimeWindows.of( - Duration.ofMillis(windowSize), Duration.ofMillis(windowSlide))) - .apply( - new RichAllWindowFunction< - Tuple2, - Tuple4, - TimeWindow>() { - - private boolean open = false; - - @Override - public void open(OpenContext openContext) { - assertEquals( - 1, - getRuntimeContext() - .getTaskInfo() - .getNumberOfParallelSubtasks()); - open = true; - } - - @Override - public void apply( - TimeWindow window, - Iterable> values, - Collector> out) { - - // validate that the function has been opened properly - assertTrue(open); - - int sum = 0; - long key = -1; - - for (Tuple2 value : values) { - sum += value.f1.value; - key = value.f0; - } - out.collect( - new Tuple4<>( - key, - window.getStart(), - window.getEnd(), - new IntType(sum))); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + + env.addSource( + new FailingSource( + new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator( + numKeys, windowSlide), + numElementsPerKey)) + .rebalance() + .windowAll( + SlidingEventTimeWindows.of( + Duration.ofMillis(windowSize), Duration.ofMillis(windowSlide))) + .apply( + new RichAllWindowFunction< + Tuple2, + Tuple4, + TimeWindow>() { + + private boolean open = false; + + @Override + public void open(OpenContext openContext) { + assertEquals( + 1, + getRuntimeContext() + .getTaskInfo() + .getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + TimeWindow window, + Iterable> values, + Collector> out) { + + // validate that the function has been opened properly + assertTrue(open); + + int sum = 0; + long key = -1; + + for (Tuple2 value : values) { + sum += value.f1.value; + key = value.f0; } - }) - .addSink( - new ValidatingSink<>( - new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun( - numElementsPerKey), - new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun( - numKeys, numElementsPerKey, windowSlide))) - .setParallelism(1); - - env.execute("Sliding Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + out.collect( + new Tuple4<>( + key, + window.getStart(), + window.getEnd(), + new IntType(sum))); + } + }) + .addSink( + new ValidatingSink<>( + new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun( + numElementsPerKey), + new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun( + numKeys, numElementsPerKey, windowSlide))) + .setParallelism(1); + + env.execute("Sliding Window Test"); } @Test - public void testPreAggregatedTumblingTimeWindow() { + public void testPreAggregatedTumblingTimeWindow() throws Exception { final int numElementsPerKey = 3000; final int windowSize = 100; final int numKeys = 1; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - - env.addSource( - new FailingSource( - new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator( - numKeys, windowSize), - numElementsPerKey)) - .rebalance() - .windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize))) - .reduce( - new ReduceFunction>() { - - @Override - public Tuple2 reduce( - Tuple2 a, Tuple2 b) { - - return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); - } - }, - new RichAllWindowFunction< - Tuple2, - Tuple4, - TimeWindow>() { - - private boolean open = false; - - @Override - public void open(OpenContext openContext) { - assertEquals( - 1, - getRuntimeContext() - .getTaskInfo() - .getNumberOfParallelSubtasks()); - open = true; - } - - @Override - public void apply( - TimeWindow window, - Iterable> input, - Collector> out) { - - // validate that the function has been opened properly - assertTrue(open); - - for (Tuple2 in : input) { - out.collect( - new Tuple4<>( - in.f0, - window.getStart(), - window.getEnd(), - in.f1)); - } + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + + env.addSource( + new FailingSource( + new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator( + numKeys, windowSize), + numElementsPerKey)) + .rebalance() + .windowAll(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize))) + .reduce( + new ReduceFunction>() { + + @Override + public Tuple2 reduce( + Tuple2 a, Tuple2 b) { + + return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); + } + }, + new RichAllWindowFunction< + Tuple2, + Tuple4, + TimeWindow>() { + + private boolean open = false; + + @Override + public void open(OpenContext openContext) { + assertEquals( + 1, + getRuntimeContext() + .getTaskInfo() + .getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + TimeWindow window, + Iterable> input, + Collector> out) { + + // validate that the function has been opened properly + assertTrue(open); + + for (Tuple2 in : input) { + out.collect( + new Tuple4<>( + in.f0, + window.getStart(), + window.getEnd(), + in.f1)); } - }) - .addSink( - new ValidatingSink<>( - new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun( - numElementsPerKey), - new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun( - numKeys, numElementsPerKey, windowSize))) - .setParallelism(1); - - env.execute("PreAggregated Tumbling Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + } + }) + .addSink( + new ValidatingSink<>( + new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun( + numElementsPerKey), + new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun( + numKeys, numElementsPerKey, windowSize))) + .setParallelism(1); + + env.execute("PreAggregated Tumbling Window Test"); } @Test - public void testPreAggregatedSlidingTimeWindow() { + public void testPreAggregatedSlidingTimeWindow() throws Exception { final int numElementsPerKey = 3000; final int windowSize = 1000; final int windowSlide = 100; final int numKeys = 1; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - - env.addSource( - new FailingSource( - new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator( - numKeys, windowSlide), - numElementsPerKey)) - .rebalance() - .windowAll( - SlidingEventTimeWindows.of( - Duration.ofMillis(windowSize), Duration.ofMillis(windowSlide))) - .reduce( - new ReduceFunction>() { - - @Override - public Tuple2 reduce( - Tuple2 a, Tuple2 b) { - - return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); - } - }, - new RichAllWindowFunction< - Tuple2, - Tuple4, - TimeWindow>() { - - private boolean open = false; - - @Override - public void open(OpenContext openContext) { - assertEquals( - 1, - getRuntimeContext() - .getTaskInfo() - .getNumberOfParallelSubtasks()); - open = true; - } - - @Override - public void apply( - TimeWindow window, - Iterable> input, - Collector> out) { - - // validate that the function has been opened properly - assertTrue(open); - - for (Tuple2 in : input) { - out.collect( - new Tuple4<>( - in.f0, - window.getStart(), - window.getEnd(), - in.f1)); - } + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + + env.addSource( + new FailingSource( + new EventTimeWindowCheckpointingITCase.KeyedEventTimeGenerator( + numKeys, windowSlide), + numElementsPerKey)) + .rebalance() + .windowAll( + SlidingEventTimeWindows.of( + Duration.ofMillis(windowSize), Duration.ofMillis(windowSlide))) + .reduce( + new ReduceFunction>() { + + @Override + public Tuple2 reduce( + Tuple2 a, Tuple2 b) { + + return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); + } + }, + new RichAllWindowFunction< + Tuple2, + Tuple4, + TimeWindow>() { + + private boolean open = false; + + @Override + public void open(OpenContext openContext) { + assertEquals( + 1, + getRuntimeContext() + .getTaskInfo() + .getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + TimeWindow window, + Iterable> input, + Collector> out) { + + // validate that the function has been opened properly + assertTrue(open); + + for (Tuple2 in : input) { + out.collect( + new Tuple4<>( + in.f0, + window.getStart(), + window.getEnd(), + in.f1)); } - }) - .addSink( - new ValidatingSink<>( - new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun( - numElementsPerKey), - new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun( - numKeys, numElementsPerKey, windowSlide))) - .setParallelism(1); - - env.execute("PreAggregated Sliding Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + } + }) + .addSink( + new ValidatingSink<>( + new EventTimeWindowCheckpointingITCase.SinkValidatorUpdateFun( + numElementsPerKey), + new EventTimeWindowCheckpointingITCase.SinkValidatorCheckFun( + numKeys, numElementsPerKey, windowSlide))) + .setParallelism(1); + + env.execute("PreAggregated Sliding Window Test"); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index ca07e3a6ab278..161062e682634 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -76,7 +76,6 @@ import static org.apache.flink.test.checkpointing.EventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * This verifies that checkpointing works correctly with event time windows. This is more strict @@ -308,435 +307,405 @@ public void stopTestCluster() throws IOException { // ------------------------------------------------------------------------ @Test - public void testTumblingTimeWindow() { + public void testTumblingTimeWindow() throws Exception { final int numElementsPerKey = numElementsPerKey(); final int windowSize = windowSize(); final int numKeys = numKeys(); - try { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(configuration); - env.setParallelism(PARALLELISM); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - env.getConfig().setUseSnapshotCompression(true); - - env.addSource( - new FailingSource( - new KeyedEventTimeGenerator(numKeys, windowSize), - numElementsPerKey)) - .rebalance() - .keyBy(x -> x.f0) - .window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize))) - .apply( - new RichWindowFunction< - Tuple2, - Tuple4, - Long, - TimeWindow>() { - - private boolean open = false; - - @Override - public void open(OpenContext openContext) { - assertEquals( - PARALLELISM, - getRuntimeContext() - .getTaskInfo() - .getNumberOfParallelSubtasks()); - open = true; + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + env.getConfig().setUseSnapshotCompression(true); + + env.addSource( + new FailingSource( + new KeyedEventTimeGenerator(numKeys, windowSize), + numElementsPerKey)) + .rebalance() + .keyBy(x -> x.f0) + .window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize))) + .apply( + new RichWindowFunction< + Tuple2, + Tuple4, + Long, + TimeWindow>() { + + private boolean open = false; + + @Override + public void open(OpenContext openContext) { + assertEquals( + PARALLELISM, + getRuntimeContext() + .getTaskInfo() + .getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + Long l, + TimeWindow window, + Iterable> values, + Collector> out) { + + // validate that the function has been opened properly + assertTrue(open); + + int sum = 0; + long key = -1; + + for (Tuple2 value : values) { + sum += value.f1.value; + key = value.f0; } - @Override - public void apply( - Long l, - TimeWindow window, - Iterable> values, - Collector> out) { - - // validate that the function has been opened properly - assertTrue(open); - - int sum = 0; - long key = -1; - - for (Tuple2 value : values) { - sum += value.f1.value; - key = value.f0; - } - - final Tuple4 result = - new Tuple4<>( - key, - window.getStart(), - window.getEnd(), - new IntType(sum)); - out.collect(result); - } - }) - .addSink( - new ValidatingSink<>( - new SinkValidatorUpdateFun(numElementsPerKey), - new SinkValidatorCheckFun( - numKeys, numElementsPerKey, windowSize))) - .setParallelism(1); - - env.execute("Tumbling Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + final Tuple4 result = + new Tuple4<>( + key, + window.getStart(), + window.getEnd(), + new IntType(sum)); + out.collect(result); + } + }) + .addSink( + new ValidatingSink<>( + new SinkValidatorUpdateFun(numElementsPerKey), + new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))) + .setParallelism(1); + + env.execute("Tumbling Window Test"); } @Test - public void testTumblingTimeWindowWithKVStateMinMaxParallelism() { + public void testTumblingTimeWindowWithKVStateMinMaxParallelism() throws Exception { doTestTumblingTimeWindowWithKVState(PARALLELISM); } @Test - public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() { + public void testTumblingTimeWindowWithKVStateMaxMaxParallelism() throws Exception { doTestTumblingTimeWindowWithKVState(1 << 15); } - public void doTestTumblingTimeWindowWithKVState(int maxParallelism) { + public void doTestTumblingTimeWindowWithKVState(int maxParallelism) throws Exception { final int numElementsPerKey = numElementsPerKey(); final int windowSize = windowSize(); final int numKeys = numKeys(); - try { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(configuration); - env.setParallelism(PARALLELISM); - env.setMaxParallelism(maxParallelism); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - env.getConfig().setUseSnapshotCompression(true); - - env.addSource( - new FailingSource( - new KeyedEventTimeGenerator(numKeys, windowSize), - numElementsPerKey)) - .rebalance() - .keyBy(x -> x.f0) - .window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize))) - .apply( - new RichWindowFunction< - Tuple2, - Tuple4, - Long, - TimeWindow>() { - - private boolean open = false; - - private ValueState count; - - @Override - public void open(OpenContext openContext) { - assertEquals( - PARALLELISM, - getRuntimeContext() - .getTaskInfo() - .getNumberOfParallelSubtasks()); - open = true; - count = - getRuntimeContext() - .getState( - new ValueStateDescriptor<>( - "count", Integer.class, 0)); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + env.setParallelism(PARALLELISM); + env.setMaxParallelism(maxParallelism); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + env.getConfig().setUseSnapshotCompression(true); + + env.addSource( + new FailingSource( + new KeyedEventTimeGenerator(numKeys, windowSize), + numElementsPerKey)) + .rebalance() + .keyBy(x -> x.f0) + .window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize))) + .apply( + new RichWindowFunction< + Tuple2, + Tuple4, + Long, + TimeWindow>() { + + private boolean open = false; + + private ValueState count; + + @Override + public void open(OpenContext openContext) { + assertEquals( + PARALLELISM, + getRuntimeContext() + .getTaskInfo() + .getNumberOfParallelSubtasks()); + open = true; + count = + getRuntimeContext() + .getState( + new ValueStateDescriptor<>( + "count", Integer.class, 0)); + } + + @Override + public void apply( + Long l, + TimeWindow window, + Iterable> values, + Collector> out) + throws Exception { + + // the window count state starts with the key, so that we get + // different count results for each key + if (count.value() == 0) { + count.update(l.intValue()); } - @Override - public void apply( - Long l, - TimeWindow window, - Iterable> values, - Collector> out) - throws Exception { - - // the window count state starts with the key, so that we get - // different count results for each key - if (count.value() == 0) { - count.update(l.intValue()); - } - - // validate that the function has been opened properly - assertTrue(open); - - count.update(count.value() + 1); - out.collect( - new Tuple4<>( - l, - window.getStart(), - window.getEnd(), - new IntType(count.value()))); - } - }) - .addSink( - new ValidatingSink<>( - new CountingSinkValidatorUpdateFun(), - new SinkValidatorCheckFun( - numKeys, numElementsPerKey, windowSize))) - .setParallelism(1); - - env.execute("Tumbling Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + // validate that the function has been opened properly + assertTrue(open); + + count.update(count.value() + 1); + out.collect( + new Tuple4<>( + l, + window.getStart(), + window.getEnd(), + new IntType(count.value()))); + } + }) + .addSink( + new ValidatingSink<>( + new CountingSinkValidatorUpdateFun(), + new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))) + .setParallelism(1); + + env.execute("Tumbling Window Test"); } @Test - public void testSlidingTimeWindow() { + public void testSlidingTimeWindow() throws Exception { final int numElementsPerKey = numElementsPerKey(); final int windowSize = windowSize(); final int windowSlide = windowSlide(); final int numKeys = numKeys(); - try { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(configuration); - env.setMaxParallelism(2 * PARALLELISM); - env.setParallelism(PARALLELISM); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - env.getConfig().setUseSnapshotCompression(true); - - env.addSource( - new FailingSource( - new KeyedEventTimeGenerator(numKeys, windowSlide), - numElementsPerKey)) - .rebalance() - .keyBy(x -> x.f0) - .window( - SlidingEventTimeWindows.of( - Duration.ofMillis(windowSize), Duration.ofMillis(windowSlide))) - .apply( - new RichWindowFunction< - Tuple2, - Tuple4, - Long, - TimeWindow>() { - - private boolean open = false; - - @Override - public void open(OpenContext openContext) { - assertEquals( - PARALLELISM, - getRuntimeContext() - .getTaskInfo() - .getNumberOfParallelSubtasks()); - open = true; - } - - @Override - public void apply( - Long l, - TimeWindow window, - Iterable> values, - Collector> out) { - - // validate that the function has been opened properly - assertTrue(open); - - int sum = 0; - long key = -1; - - for (Tuple2 value : values) { - sum += value.f1.value; - key = value.f0; - } - final Tuple4 output = - new Tuple4<>( - key, - window.getStart(), - window.getEnd(), - new IntType(sum)); - out.collect(output); + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + env.setMaxParallelism(2 * PARALLELISM); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + env.getConfig().setUseSnapshotCompression(true); + + env.addSource( + new FailingSource( + new KeyedEventTimeGenerator(numKeys, windowSlide), + numElementsPerKey)) + .rebalance() + .keyBy(x -> x.f0) + .window( + SlidingEventTimeWindows.of( + Duration.ofMillis(windowSize), Duration.ofMillis(windowSlide))) + .apply( + new RichWindowFunction< + Tuple2, + Tuple4, + Long, + TimeWindow>() { + + private boolean open = false; + + @Override + public void open(OpenContext openContext) { + assertEquals( + PARALLELISM, + getRuntimeContext() + .getTaskInfo() + .getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + Long l, + TimeWindow window, + Iterable> values, + Collector> out) { + + // validate that the function has been opened properly + assertTrue(open); + + int sum = 0; + long key = -1; + + for (Tuple2 value : values) { + sum += value.f1.value; + key = value.f0; } - }) - .addSink( - new ValidatingSink<>( - new SinkValidatorUpdateFun(numElementsPerKey), - new SinkValidatorCheckFun( - numKeys, numElementsPerKey, windowSlide))) - .setParallelism(1); - - env.execute("Tumbling Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + final Tuple4 output = + new Tuple4<>( + key, + window.getStart(), + window.getEnd(), + new IntType(sum)); + out.collect(output); + } + }) + .addSink( + new ValidatingSink<>( + new SinkValidatorUpdateFun(numElementsPerKey), + new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))) + .setParallelism(1); + + env.execute("Tumbling Window Test"); } @Test - public void testPreAggregatedTumblingTimeWindow() { + public void testPreAggregatedTumblingTimeWindow() throws Exception { final int numElementsPerKey = numElementsPerKey(); final int windowSize = windowSize(); final int numKeys = numKeys(); - try { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(configuration); - env.setParallelism(PARALLELISM); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - env.getConfig().setUseSnapshotCompression(true); - - env.addSource( - new FailingSource( - new KeyedEventTimeGenerator(numKeys, windowSize), - numElementsPerKey)) - .rebalance() - .keyBy(x -> x.f0) - .window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize))) - .reduce( - new ReduceFunction>() { - - @Override - public Tuple2 reduce( - Tuple2 a, Tuple2 b) { - return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); - } - }, - new RichWindowFunction< - Tuple2, - Tuple4, - Long, - TimeWindow>() { - - private boolean open = false; - - @Override - public void open(OpenContext openContext) { - assertEquals( - PARALLELISM, - getRuntimeContext() - .getTaskInfo() - .getNumberOfParallelSubtasks()); - open = true; - } - - @Override - public void apply( - Long l, - TimeWindow window, - Iterable> input, - Collector> out) { - - // validate that the function has been opened properly - assertTrue(open); - - for (Tuple2 in : input) { - final Tuple4 output = - new Tuple4<>( - in.f0, - window.getStart(), - window.getEnd(), - in.f1); - out.collect(output); - } + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + env.getConfig().setUseSnapshotCompression(true); + + env.addSource( + new FailingSource( + new KeyedEventTimeGenerator(numKeys, windowSize), + numElementsPerKey)) + .rebalance() + .keyBy(x -> x.f0) + .window(TumblingEventTimeWindows.of(Duration.ofMillis(windowSize))) + .reduce( + new ReduceFunction>() { + + @Override + public Tuple2 reduce( + Tuple2 a, Tuple2 b) { + return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); + } + }, + new RichWindowFunction< + Tuple2, + Tuple4, + Long, + TimeWindow>() { + + private boolean open = false; + + @Override + public void open(OpenContext openContext) { + assertEquals( + PARALLELISM, + getRuntimeContext() + .getTaskInfo() + .getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + Long l, + TimeWindow window, + Iterable> input, + Collector> out) { + + // validate that the function has been opened properly + assertTrue(open); + + for (Tuple2 in : input) { + final Tuple4 output = + new Tuple4<>( + in.f0, + window.getStart(), + window.getEnd(), + in.f1); + out.collect(output); } - }) - .addSink( - new ValidatingSink<>( - new SinkValidatorUpdateFun(numElementsPerKey), - new SinkValidatorCheckFun( - numKeys, numElementsPerKey, windowSize))) - .setParallelism(1); - - env.execute("Tumbling Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + } + }) + .addSink( + new ValidatingSink<>( + new SinkValidatorUpdateFun(numElementsPerKey), + new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSize))) + .setParallelism(1); + + env.execute("Tumbling Window Test"); } @Test - public void testPreAggregatedSlidingTimeWindow() { + public void testPreAggregatedSlidingTimeWindow() throws Exception { final int numElementsPerKey = numElementsPerKey(); final int windowSize = windowSize(); final int windowSlide = windowSlide(); final int numKeys = numKeys(); - try { - StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(configuration); - env.setParallelism(PARALLELISM); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - env.getConfig().setUseSnapshotCompression(true); - - env.addSource( - new FailingSource( - new KeyedEventTimeGenerator(numKeys, windowSlide), - numElementsPerKey)) - .rebalance() - .keyBy(x -> x.f0) - .window( - SlidingEventTimeWindows.of( - Duration.ofMillis(windowSize), Duration.ofMillis(windowSlide))) - .reduce( - new ReduceFunction>() { - - @Override - public Tuple2 reduce( - Tuple2 a, Tuple2 b) { - - // validate that the function has been opened properly - return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); - } - }, - new RichWindowFunction< - Tuple2, - Tuple4, - Long, - TimeWindow>() { - - private boolean open = false; - - @Override - public void open(OpenContext openContext) { - assertEquals( - PARALLELISM, - getRuntimeContext() - .getTaskInfo() - .getNumberOfParallelSubtasks()); - open = true; - } - - @Override - public void apply( - Long l, - TimeWindow window, - Iterable> input, - Collector> out) { - - // validate that the function has been opened properly - assertTrue(open); - - for (Tuple2 in : input) { - out.collect( - new Tuple4<>( - in.f0, - window.getStart(), - window.getEnd(), - in.f1)); - } + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + env.getConfig().setUseSnapshotCompression(true); + + env.addSource( + new FailingSource( + new KeyedEventTimeGenerator(numKeys, windowSlide), + numElementsPerKey)) + .rebalance() + .keyBy(x -> x.f0) + .window( + SlidingEventTimeWindows.of( + Duration.ofMillis(windowSize), Duration.ofMillis(windowSlide))) + .reduce( + new ReduceFunction>() { + + @Override + public Tuple2 reduce( + Tuple2 a, Tuple2 b) { + + // validate that the function has been opened properly + return new Tuple2<>(a.f0, new IntType(a.f1.value + b.f1.value)); + } + }, + new RichWindowFunction< + Tuple2, + Tuple4, + Long, + TimeWindow>() { + + private boolean open = false; + + @Override + public void open(OpenContext openContext) { + assertEquals( + PARALLELISM, + getRuntimeContext() + .getTaskInfo() + .getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + Long l, + TimeWindow window, + Iterable> input, + Collector> out) { + + // validate that the function has been opened properly + assertTrue(open); + + for (Tuple2 in : input) { + out.collect( + new Tuple4<>( + in.f0, + window.getStart(), + window.getEnd(), + in.f1)); } - }) - .addSink( - new ValidatingSink<>( - new SinkValidatorUpdateFun(numElementsPerKey), - new SinkValidatorCheckFun( - numKeys, numElementsPerKey, windowSlide))) - .setParallelism(1); - - env.execute("Tumbling Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + } + }) + .addSink( + new ValidatingSink<>( + new SinkValidatorUpdateFun(numElementsPerKey), + new SinkValidatorCheckFun(numKeys, numElementsPerKey, windowSlide))) + .setParallelism(1); + + env.execute("Tumbling Window Test"); } // ------------------------------------------------------------------------ diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java index 1ebbbd35f0af7..d58905e1ef62d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ProcessingTimeWindowCheckpointingITCase.java @@ -78,216 +78,190 @@ private static Configuration getConfiguration() { // ------------------------------------------------------------------------ @Test - public void testTumblingProcessingTimeWindow() { + public void testTumblingProcessingTimeWindow() throws Exception { final int numElements = 3000; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - env.getConfig().setAutoWatermarkInterval(10); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - - SinkValidatorUpdaterAndChecker updaterAndChecker = - new SinkValidatorUpdaterAndChecker(numElements, 1); - - env.addSource(new FailingSource(new Generator(), numElements, true)) - .rebalance() - .keyBy(x -> x.f0) - .window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100))) - .apply( - new RichWindowFunction< - Tuple2, - Tuple2, - Long, - TimeWindow>() { - - private boolean open = false; - - @Override - public void open(OpenContext openContext) { - assertEquals( - PARALLELISM, - getRuntimeContext() - .getTaskInfo() - .getNumberOfParallelSubtasks()); - open = true; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.getConfig().setAutoWatermarkInterval(10); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + + SinkValidatorUpdaterAndChecker updaterAndChecker = + new SinkValidatorUpdaterAndChecker(numElements, 1); + + env.addSource(new FailingSource(new Generator(), numElements, true)) + .rebalance() + .keyBy(x -> x.f0) + .window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100))) + .apply( + new RichWindowFunction< + Tuple2, Tuple2, Long, TimeWindow>() { + + private boolean open = false; + + @Override + public void open(OpenContext openContext) { + assertEquals( + PARALLELISM, + getRuntimeContext() + .getTaskInfo() + .getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + Long l, + TimeWindow window, + Iterable> values, + Collector> out) { + + // validate that the function has been opened properly + assertTrue(open); + + for (Tuple2 value : values) { + assertEquals(value.f0.intValue(), value.f1.value); + out.collect(new Tuple2<>(value.f0, new IntType(1))); } + } + }) + .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, true)) + .setParallelism(1); - @Override - public void apply( - Long l, - TimeWindow window, - Iterable> values, - Collector> out) { - - // validate that the function has been opened properly - assertTrue(open); - - for (Tuple2 value : values) { - assertEquals(value.f0.intValue(), value.f1.value); - out.collect(new Tuple2<>(value.f0, new IntType(1))); - } - } - }) - .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, true)) - .setParallelism(1); - - tryExecute(env, "Tumbling Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + tryExecute(env, "Tumbling Window Test"); } @Test - public void testSlidingProcessingTimeWindow() { + public void testSlidingProcessingTimeWindow() throws Exception { final int numElements = 3000; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - env.getConfig().setAutoWatermarkInterval(10); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - SinkValidatorUpdaterAndChecker updaterAndChecker = - new SinkValidatorUpdaterAndChecker(numElements, 3); - env.addSource(new FailingSource(new Generator(), numElements, true)) - .rebalance() - .keyBy(x -> x.f0) - .window( - SlidingProcessingTimeWindows.of( - Duration.ofMillis(150), Duration.ofMillis(50))) - .apply( - new RichWindowFunction< - Tuple2, - Tuple2, - Long, - TimeWindow>() { - - private boolean open = false; - - @Override - public void open(OpenContext openContext) { - assertEquals( - PARALLELISM, - getRuntimeContext() - .getTaskInfo() - .getNumberOfParallelSubtasks()); - open = true; + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.getConfig().setAutoWatermarkInterval(10); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + SinkValidatorUpdaterAndChecker updaterAndChecker = + new SinkValidatorUpdaterAndChecker(numElements, 3); + env.addSource(new FailingSource(new Generator(), numElements, true)) + .rebalance() + .keyBy(x -> x.f0) + .window( + SlidingProcessingTimeWindows.of( + Duration.ofMillis(150), Duration.ofMillis(50))) + .apply( + new RichWindowFunction< + Tuple2, Tuple2, Long, TimeWindow>() { + + private boolean open = false; + + @Override + public void open(OpenContext openContext) { + assertEquals( + PARALLELISM, + getRuntimeContext() + .getTaskInfo() + .getNumberOfParallelSubtasks()); + open = true; + } + + @Override + public void apply( + Long l, + TimeWindow window, + Iterable> values, + Collector> out) { + + // validate that the function has been opened properly + assertTrue(open); + + for (Tuple2 value : values) { + assertEquals(value.f0.intValue(), value.f1.value); + out.collect(new Tuple2<>(value.f0, new IntType(1))); } + } + }) + .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, true)) + .setParallelism(1); - @Override - public void apply( - Long l, - TimeWindow window, - Iterable> values, - Collector> out) { - - // validate that the function has been opened properly - assertTrue(open); - - for (Tuple2 value : values) { - assertEquals(value.f0.intValue(), value.f1.value); - out.collect(new Tuple2<>(value.f0, new IntType(1))); - } - } - }) - .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, true)) - .setParallelism(1); - - tryExecute(env, "Sliding Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + tryExecute(env, "Sliding Window Test"); } @Test - public void testAggregatingTumblingProcessingTimeWindow() { + public void testAggregatingTumblingProcessingTimeWindow() throws Exception { final int numElements = 3000; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - env.getConfig().setAutoWatermarkInterval(10); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - SinkValidatorUpdaterAndChecker updaterAndChecker = - new SinkValidatorUpdaterAndChecker(numElements, 1); - env.addSource(new FailingSource(new Generator(), numElements, true)) - .map( - new MapFunction, Tuple2>() { - @Override - public Tuple2 map(Tuple2 value) { - value.f1.value = 1; - return value; - } - }) - .rebalance() - .keyBy(x -> x.f0) - .window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100))) - .reduce( - new ReduceFunction>() { - - @Override - public Tuple2 reduce( - Tuple2 a, Tuple2 b) { - return new Tuple2<>(a.f0, new IntType(1)); - } - }) - .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, true)) - .setParallelism(1); - - tryExecute(env, "Aggregating Tumbling Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.getConfig().setAutoWatermarkInterval(10); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + SinkValidatorUpdaterAndChecker updaterAndChecker = + new SinkValidatorUpdaterAndChecker(numElements, 1); + env.addSource(new FailingSource(new Generator(), numElements, true)) + .map( + new MapFunction, Tuple2>() { + @Override + public Tuple2 map(Tuple2 value) { + value.f1.value = 1; + return value; + } + }) + .rebalance() + .keyBy(x -> x.f0) + .window(TumblingProcessingTimeWindows.of(Duration.ofMillis(100))) + .reduce( + new ReduceFunction>() { + + @Override + public Tuple2 reduce( + Tuple2 a, Tuple2 b) { + return new Tuple2<>(a.f0, new IntType(1)); + } + }) + .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, true)) + .setParallelism(1); + + tryExecute(env, "Aggregating Tumbling Window Test"); } @Test - public void testAggregatingSlidingProcessingTimeWindow() { + public void testAggregatingSlidingProcessingTimeWindow() throws Exception { final int numElements = 3000; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - env.getConfig().setAutoWatermarkInterval(10); - env.enableCheckpointing(100); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); - SinkValidatorUpdaterAndChecker updaterAndChecker = - new SinkValidatorUpdaterAndChecker(numElements, 3); - env.addSource(new FailingSource(new Generator(), numElements, true)) - .map( - new MapFunction, Tuple2>() { - @Override - public Tuple2 map(Tuple2 value) { - value.f1.value = 1; - return value; - } - }) - .rebalance() - .keyBy(x -> x.f0) - .window( - SlidingProcessingTimeWindows.of( - Duration.ofMillis(150), Duration.ofMillis(50))) - .reduce( - new ReduceFunction>() { - @Override - public Tuple2 reduce( - Tuple2 a, Tuple2 b) { - return new Tuple2<>(a.f0, new IntType(1)); - } - }) - .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, true)) - .setParallelism(1); - - tryExecute(env, "Aggregating Sliding Window Test"); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.getConfig().setAutoWatermarkInterval(10); + env.enableCheckpointing(100); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0L); + SinkValidatorUpdaterAndChecker updaterAndChecker = + new SinkValidatorUpdaterAndChecker(numElements, 3); + env.addSource(new FailingSource(new Generator(), numElements, true)) + .map( + new MapFunction, Tuple2>() { + @Override + public Tuple2 map(Tuple2 value) { + value.f1.value = 1; + return value; + } + }) + .rebalance() + .keyBy(x -> x.f0) + .window( + SlidingProcessingTimeWindows.of( + Duration.ofMillis(150), Duration.ofMillis(50))) + .reduce( + new ReduceFunction>() { + @Override + public Tuple2 reduce( + Tuple2 a, Tuple2 b) { + return new Tuple2<>(a.f0, new IntType(1)); + } + }) + .addSink(new ValidatingSink<>(updaterAndChecker, updaterAndChecker, true)) + .setParallelism(1); + + tryExecute(env, "Aggregating Sliding Window Test"); } // ------------------------------------------------------------------------ diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java index 39bf862c0e26c..89658ec82e7a5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java @@ -141,16 +141,11 @@ public static void shutDownExistingCluster() { * completed checkpoint's. */ @Test(timeout = 60000) - public void testMultiRegionFailover() { - try { - JobGraph jobGraph = createJobGraph(); - ClusterClient client = cluster.getClusterClient(); - submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader()); - verifyAfterJobExecuted(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); - } + public void testMultiRegionFailover() throws Exception { + JobGraph jobGraph = createJobGraph(); + ClusterClient client = cluster.getClusterClient(); + submitJobAndWaitForResult(client, jobGraph, getClass().getClassLoader()); + verifyAfterJobExecuted(); } private void verifyAfterJobExecuted() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java index 62451cd39fb57..57871604682e6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java @@ -53,7 +53,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Integration test for the {@link CheckpointListener} interface. The test ensures that {@link @@ -83,74 +82,67 @@ public class StreamCheckpointNotifierITCase extends AbstractTestBaseJUnit4 { * */ @Test - public void testProgram() { - try { - final StreamExecutionEnvironment env = - StreamExecutionEnvironment.getExecutionEnvironment(); - assertEquals("test setup broken", PARALLELISM, env.getParallelism()); + public void testProgram() throws Exception { + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + assertEquals("test setup broken", PARALLELISM, env.getParallelism()); - env.enableCheckpointing(500); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L); + env.enableCheckpointing(500); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L); - final int numElements = 10000; - final int numTaskTotal = PARALLELISM * 5; + final int numElements = 10000; + final int numTaskTotal = PARALLELISM * 5; - DataStream stream = - env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal)); + DataStream stream = + env.addSource(new GeneratingSourceFunction(numElements, numTaskTotal)); - stream - // -------------- first vertex, chained to the src ---------------- - .filter(new LongRichFilterFunction()) + stream + // -------------- first vertex, chained to the src ---------------- + .filter(new LongRichFilterFunction()) - // -------------- second vertex, applying the co-map ---------------- - .connect(stream) - .flatMap(new LeftIdentityCoRichFlatMapFunction()) + // -------------- second vertex, applying the co-map ---------------- + .connect(stream) + .flatMap(new LeftIdentityCoRichFlatMapFunction()) - // -------------- third vertex - the stateful one that also fails - // ---------------- - .map(new IdentityMapFunction()) - .startNewChain() + // -------------- third vertex - the stateful one that also fails + // ---------------- + .map(new IdentityMapFunction()) + .startNewChain() - // -------------- fourth vertex - reducer and the sink ---------------- - .keyBy(x -> x.f0) - .reduce(new OnceFailingReducer(numElements)) - .sinkTo(new DiscardingSink<>()); + // -------------- fourth vertex - reducer and the sink ---------------- + .keyBy(x -> x.f0) + .reduce(new OnceFailingReducer(numElements)) + .sinkTo(new DiscardingSink<>()); - env.execute(); + env.execute(); - final long failureCheckpointID = OnceFailingReducer.failureCheckpointID; - assertNotEquals(0L, failureCheckpointID); + final long failureCheckpointID = OnceFailingReducer.failureCheckpointID; + assertNotEquals(0L, failureCheckpointID); - List[]> allLists = - Arrays.asList( - GeneratingSourceFunction.COMPLETED_CHECKPOINTS, - LongRichFilterFunction.COMPLETED_CHECKPOINTS, - LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS, - IdentityMapFunction.COMPLETED_CHECKPOINTS, - OnceFailingReducer.COMPLETED_CHECKPOINTS); + List[]> allLists = + Arrays.asList( + GeneratingSourceFunction.COMPLETED_CHECKPOINTS, + LongRichFilterFunction.COMPLETED_CHECKPOINTS, + LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS, + IdentityMapFunction.COMPLETED_CHECKPOINTS, + OnceFailingReducer.COMPLETED_CHECKPOINTS); - for (List[] parallelNotifications : allLists) { - for (List notifications : parallelNotifications) { + for (List[] parallelNotifications : allLists) { + for (List notifications : parallelNotifications) { - assertTrue( - "No checkpoint notification was received.", notifications.size() > 0); + assertTrue("No checkpoint notification was received.", notifications.size() > 0); - assertFalse( - "Failure checkpoint was marked as completed.", - notifications.contains(failureCheckpointID)); + assertFalse( + "Failure checkpoint was marked as completed.", + notifications.contains(failureCheckpointID)); - assertFalse( - "No checkpoint received after failure.", - notifications.get(notifications.size() - 1) == failureCheckpointID); + assertFalse( + "No checkpoint received after failure.", + notifications.get(notifications.size() - 1) == failureCheckpointID); - assertTrue( - "Checkpoint notification was received multiple times", - notifications.size() == new HashSet(notifications).size()); - } + assertTrue( + "Checkpoint notification was received multiple times", + notifications.size() == new HashSet(notifications).size()); } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java index a0e13ef9d50f7..61de82d41393b 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java @@ -123,28 +123,22 @@ public void shutDownExistingCluster() { */ @Test public void runCheckpointedProgram() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(PARALLELISM); + env.enableCheckpointing(500); + RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L); + + testProgram(env); + + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(PARALLELISM); - env.enableCheckpointing(500); - RestartStrategyUtils.configureFixedDelayRestartStrategy(env, Integer.MAX_VALUE, 0L); - - testProgram(env); - - JobGraph jobGraph = env.getStreamGraph().getJobGraph(); - try { - submitJobAndWaitForResult( - cluster.getClusterClient(), jobGraph, getClass().getClassLoader()); - } catch (Exception e) { - Assert.assertTrue( - ExceptionUtils.findThrowable(e, SuccessException.class).isPresent()); - } - - postSubmit(); + submitJobAndWaitForResult( + cluster.getClusterClient(), jobGraph, getClass().getClassLoader()); } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); + Assert.assertTrue(ExceptionUtils.findThrowable(e, SuccessException.class).isPresent()); } + + postSubmit(); } // -------------------------------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java index cb93a298e0405..e6917f21a8fbd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/TimestampedFileInputSplitTest.java @@ -87,16 +87,10 @@ public void testSplitComparison() { Assert.assertTrue(richThirdSplit.compareTo(richForthSplit) < 0); } - @Test + @Test(expected = IllegalArgumentException.class) public void testIllegalArgument() { - try { - new TimestampedFileInputSplit( - -10, 2, new Path("test"), 0, 100, null); // invalid modification time - } catch (Exception e) { - if (!(e instanceof IllegalArgumentException)) { - Assert.fail(e.getMessage()); - } - } + new TimestampedFileInputSplit( + -10, 2, new Path("test"), 0, 100, null); // invalid modification time } @Test diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java index 7cfab00d5e393..e005e39cff308 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/LocalExecutorITCase.java @@ -36,7 +36,6 @@ import org.apache.flink.test.testfunctions.Tokenizer; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -67,32 +66,26 @@ public void before() { } @Test(timeout = 60_000) - public void testLocalExecutorWithWordCount() throws InterruptedException { - try { - // set up the files - File inFile = File.createTempFile("wctext", ".in"); - File outFile = File.createTempFile("wctext", ".out"); - inFile.deleteOnExit(); - outFile.deleteOnExit(); - - try (FileWriter fw = new FileWriter(inFile)) { - fw.write(WordCountData.TEXT); - } - - final Configuration config = new Configuration(); - config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true); - config.set(DeploymentOptions.ATTACHED, true); - - StreamGraph wcStreamGraph = getWordCountStreamGraph(inFile, outFile, parallelism); - JobClient jobClient = - executor.execute(wcStreamGraph, config, ClassLoader.getSystemClassLoader()) - .get(); - jobClient.getJobExecutionResult().get(); - } catch (Exception e) { - e.printStackTrace(); - Assert.fail(e.getMessage()); + public void testLocalExecutorWithWordCount() throws Exception { + // set up the files + File inFile = File.createTempFile("wctext", ".in"); + File outFile = File.createTempFile("wctext", ".out"); + inFile.deleteOnExit(); + outFile.deleteOnExit(); + + try (FileWriter fw = new FileWriter(inFile)) { + fw.write(WordCountData.TEXT); } + final Configuration config = new Configuration(); + config.set(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true); + config.set(DeploymentOptions.ATTACHED, true); + + StreamGraph wcStreamGraph = getWordCountStreamGraph(inFile, outFile, parallelism); + JobClient jobClient = + executor.execute(wcStreamGraph, config, ClassLoader.getSystemClassLoader()).get(); + jobClient.getJobExecutionResult().get(); + assertThat(miniCluster.isRunning(), is(false)); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java index 124348ac5a4c8..c1ebd8fea0d51 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/io/InputOutputITCase.java @@ -24,8 +24,6 @@ import org.apache.flink.streaming.api.functions.sink.legacy.OutputFormatSinkFunction; import org.apache.flink.test.util.JavaProgramTestBaseJUnit4; -import static org.junit.Assert.fail; - /** * Tests for non rich DataSource and DataSink input output formats being correctly used at runtime. */ @@ -38,12 +36,7 @@ protected void testProgram() throws Exception { TestNonRichOutputFormat output = new TestNonRichOutputFormat(); env.createInput(new TestNonRichInputFormat()) .addSink(new OutputFormatSinkFunction<>(output)); - try { - env.execute(); - } catch (Exception e) { - // we didn't break anything by making everything rich. - e.printStackTrace(); - fail(e.getMessage()); - } + env.execute(); + // we didn't break anything by making everything rich. } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java index 720b99cb5cd57..32281cdcfb30f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/CustomSerializationITCase.java @@ -40,7 +40,6 @@ import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; /** * Test for proper error messages in case user-defined serialization is broken and detected in the @@ -67,7 +66,7 @@ public static Configuration getConfiguration() { } @Test - public void testIncorrectSerializer1() { + public void testIncorrectSerializer1() throws Exception { try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARLLELISM); @@ -93,14 +92,11 @@ public ConsumesTooMuch map(Long value) throws Exception { .getMessage() .contains("broken serialization.")) .isPresent()); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); } } @Test - public void testIncorrectSerializer2() { + public void testIncorrectSerializer2() throws Exception { try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARLLELISM); @@ -126,14 +122,11 @@ public ConsumesTooMuchSpanning map(Long value) throws Exception { .getMessage() .contains("broken serialization.")) .isPresent()); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); } } @Test - public void testIncorrectSerializer3() { + public void testIncorrectSerializer3() throws Exception { try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARLLELISM); @@ -159,14 +152,11 @@ public ConsumesTooLittle map(Long value) throws Exception { .getMessage() .contains("broken serialization.")) .isPresent()); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); } } @Test - public void testIncorrectSerializer4() { + public void testIncorrectSerializer4() throws Exception { try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(PARLLELISM); @@ -192,9 +182,6 @@ public ConsumesTooLittleSpanning map(Long value) throws Exception { .getMessage() .contains("broken serialization.")) .isPresent()); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java index 6febe2ea47310..c883564848695 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/misc/MiscellaneousIssuesITCase.java @@ -63,87 +63,70 @@ public class MiscellaneousIssuesITCase extends TestLogger { .build()); @Test - public void testNullValues() { - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(1); - - DataStream data = - env.fromData("hallo") - .map( - new MapFunction() { - @Override - public String map(String value) throws Exception { - return null; - } - }); - data.sinkTo( - FileSink.forRowFormat( - new Path("/tmp/myTest"), new SimpleStringEncoder()) - .build()); + public void testNullValues() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(1); + + DataStream data = + env.fromData("hallo") + .map( + new MapFunction() { + @Override + public String map(String value) throws Exception { + return null; + } + }); + data.sinkTo( + FileSink.forRowFormat(new Path("/tmp/myTest"), new SimpleStringEncoder()) + .build()); - try { - env.execute(); - fail("this should fail due to null values."); - } catch (JobExecutionException e) { - assertTrue(findThrowable(e, NullPointerException.class).isPresent()); - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + try { + env.execute(); + fail("this should fail due to null values."); + } catch (JobExecutionException e) { + assertTrue(findThrowable(e, NullPointerException.class).isPresent()); } } @Test - public void testDisjointDataflows() { - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(5); + public void testDisjointDataflows() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(5); - // generate two different flows - env.fromSequence(1, 10).sinkTo(new DiscardingSink<>()); - env.fromSequence(1, 10).sinkTo(new DiscardingSink<>()); - - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + // generate two different flows + env.fromSequence(1, 10).sinkTo(new DiscardingSink<>()); + env.fromSequence(1, 10).sinkTo(new DiscardingSink<>()); } @Test - public void testAccumulatorsAfterNoOp() { + public void testAccumulatorsAfterNoOp() throws Exception { final String accName = "test_accumulator"; - try { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(6); - - env.fromSequence(1, 1000000) - .rebalance() - .flatMap( - new RichFlatMapFunction() { - - private LongCounter counter; - - @Override - public void open(OpenContext openContext) { - counter = getRuntimeContext().getLongCounter(accName); - } - - @Override - public void flatMap(Long value, Collector out) { - counter.add(1L); - } - }) - .sinkTo(new DiscardingSink<>()); - - JobExecutionResult result = env.execute(); - - assertEquals(1000000L, result.getAllAccumulatorResults().get(accName)); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(6); + + env.fromSequence(1, 1000000) + .rebalance() + .flatMap( + new RichFlatMapFunction() { + + private LongCounter counter; + + @Override + public void open(OpenContext openContext) { + counter = getRuntimeContext().getLongCounter(accName); + } + + @Override + public void flatMap(Long value, Collector out) { + counter.add(1L); + } + }) + .sinkTo(new DiscardingSink<>()); + + JobExecutionResult result = env.execute(); + + assertEquals(1000000L, result.getAllAccumulatorResults().get(accName)); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java index 7a6e353d9d65a..b5389bf03a263 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java @@ -226,13 +226,11 @@ public void run() { // all seems well :-) } catch (Exception e) { - e.printStackTrace(); printProcessLog("TaskManager 1", taskManagerProcess1); printProcessLog("TaskManager 2", taskManagerProcess2); printProcessLog("TaskManager 3", taskManagerProcess3); - fail(e.getMessage()); + throw e; } catch (Error e) { - e.printStackTrace(); printProcessLog("TaskManager 1", taskManagerProcess1); printProcessLog("TaskManager 2", taskManagerProcess2); printProcessLog("TaskManager 3", taskManagerProcess3); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java index a5d0215a959c0..99eb787731ebd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/IPv6HostnamesITCase.java @@ -51,8 +51,6 @@ import java.util.Enumeration; import java.util.List; -import static org.junit.Assert.fail; - /** Test proper handling of IPv6 address literals in URLs. */ @SuppressWarnings("serial") public class IPv6HostnamesITCase extends TestLogger { @@ -83,52 +81,44 @@ private Configuration getConfiguration() { } @Test - public void testClusterWithIPv6host() { - try { - - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setParallelism(4); - - // get input data - DataStream text = env.fromData(WordCountData.TEXT.split("\n")); - - DataStream> counts = - text.flatMap( - new FlatMapFunction>() { - @Override - public void flatMap( - String value, - Collector> out) - throws Exception { - for (String token : value.toLowerCase().split("\\W+")) { - if (token.length() > 0) { - out.collect( - new Tuple2(token, 1)); - } + public void testClusterWithIPv6host() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(4); + + // get input data + DataStream text = env.fromData(WordCountData.TEXT.split("\n")); + + DataStream> counts = + text.flatMap( + new FlatMapFunction>() { + @Override + public void flatMap( + String value, Collector> out) + throws Exception { + for (String token : value.toLowerCase().split("\\W+")) { + if (token.length() > 0) { + out.collect(new Tuple2(token, 1)); } } - }) - .keyBy(x -> x.f0) - .window(GlobalWindows.createWithEndOfStreamTrigger()) - .reduce( - new ReduceFunction>() { - @Override - public Tuple2 reduce( - Tuple2 value1, - Tuple2 value2) - throws Exception { - return Tuple2.of(value1.f0, value1.f1 + value2.f1); - } - }); - - List> result = - CollectionUtil.iteratorToList(counts.executeAndCollect()); - - TestBaseUtils.compareResultAsText(result, WordCountData.COUNTS_AS_TUPLES); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + } + }) + .keyBy(x -> x.f0) + .window(GlobalWindows.createWithEndOfStreamTrigger()) + .reduce( + new ReduceFunction>() { + @Override + public Tuple2 reduce( + Tuple2 value1, + Tuple2 value2) + throws Exception { + return Tuple2.of(value1.f0, value1.f1 + value2.f1); + } + }); + + List> result = + CollectionUtil.iteratorToList(counts.executeAndCollect()); + + TestBaseUtils.compareResultAsText(result, WordCountData.COUNTS_AS_TUPLES); } private Inet6Address getLocalIPv6Address() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java index f31dc9e737070..7019f77504402 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/state/StateHandleSerializationTest.java @@ -36,22 +36,17 @@ public class StateHandleSerializationTest { @Test - public void ensureStateHandlesHaveSerialVersionUID() { - try { - Reflections reflections = new Reflections("org.apache.flink"); + public void ensureStateHandlesHaveSerialVersionUID() throws Exception { + Reflections reflections = new Reflections("org.apache.flink"); - // check all state handles + // check all state handles - @SuppressWarnings("unchecked") - Set> stateHandleImplementations = - (Set>) (Set) reflections.getSubTypesOf(StateObject.class); + @SuppressWarnings("unchecked") + Set> stateHandleImplementations = + (Set>) (Set) reflections.getSubTypesOf(StateObject.class); - for (Class clazz : stateHandleImplementations) { - validataSerialVersionUID(clazz); - } - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); + for (Class clazz : stateHandleImplementations) { + validataSerialVersionUID(clazz); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java index 6c60f12b705da..736670f6283f2 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/PartitionerITCase.java @@ -80,7 +80,7 @@ public void testForwardFailsHightToLowParallelism() throws Exception { } @Test - public void partitionerTest() { + public void partitionerTest() throws Exception { TestListResultSink> hashPartitionResultSink = new TestListResultSink>(); @@ -145,12 +145,7 @@ public Tuple1 map(Tuple1 value) throws Exception { // partition global src.global().map(new SubtaskIndexAssigner()).addSink(globalPartitionResultSink); - try { - env.execute(); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + env.execute(); List> hashPartitionResult = hashPartitionResultSink.getResult(); List> customPartitionResult = customPartitionResultSink.getResult();