2727import org .apache .flink .configuration .Configuration ;
2828import org .apache .flink .configuration .ExternalizedCheckpointRetention ;
2929import org .apache .flink .configuration .MemorySize ;
30+ import org .apache .flink .configuration .RestartStrategyOptions ;
3031import org .apache .flink .configuration .StateRecoveryOptions ;
3132import org .apache .flink .configuration .TaskManagerOptions ;
3233import org .apache .flink .connector .datagen .source .DataGeneratorSource ;
5758import java .util .List ;
5859import java .util .Random ;
5960
61+ import static org .apache .flink .configuration .RestartStrategyOptions .RestartStrategyType .NO_RESTART_STRATEGY ;
62+
6063/**
6164 * Integration test for rescaling jobs with mixed (UC-supported and UC-unsupported) exchanges from
6265 * an unaligned checkpoint.
@@ -80,7 +83,8 @@ public static Collection<ExecuteJobViaEnv> parameter() {
8083 UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMultiOutputDAG ,
8184 UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMultiInputDAG ,
8285 UnalignedCheckpointRescaleWithMixedExchangesITCase ::createRescalePartitionerDAG ,
83- UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMixedComplexityDAG );
86+ UnalignedCheckpointRescaleWithMixedExchangesITCase ::createMixedComplexityDAG ,
87+ UnalignedCheckpointRescaleWithMixedExchangesITCase ::createPartEmptyHashExchangeDAG );
8488 }
8589
8690 @ Before
@@ -137,6 +141,7 @@ private StreamExecutionEnvironment getUnalignedCheckpointEnv(@Nullable String re
137141 conf .set (CheckpointingOptions .CHECKPOINTING_INTERVAL , Duration .ofSeconds (1 ));
138142 // Disable aligned timeout to ensure it works with unaligned checkpoint directly
139143 conf .set (CheckpointingOptions .ALIGNED_CHECKPOINT_TIMEOUT , Duration .ofSeconds (0 ));
144+ conf .set (RestartStrategyOptions .RESTART_STRATEGY , NO_RESTART_STRATEGY .getMainValue ());
140145 conf .set (
141146 CheckpointingOptions .EXTERNALIZED_CHECKPOINT_RETENTION ,
142147 ExternalizedCheckpointRetention .RETAIN_ON_CANCELLATION );
@@ -336,6 +341,53 @@ private static JobClient createMixedComplexityDAG(StreamExecutionEnvironment env
336341 return env .executeAsync ();
337342 }
338343
344+ /**
345+ * Creates a DAG where the downstream MapAfterKeyBy task receives input from two hash exchanges:
346+ * one with actual data and one that is empty due to filtering. This tests unaligned checkpoint
347+ * rescaling with mixed empty and non-empty hash partitions.
348+ */
349+ private static JobClient createPartEmptyHashExchangeDAG (StreamExecutionEnvironment env )
350+ throws Exception {
351+ int source1Parallelism = getRandomParallelism ();
352+ DataGeneratorSource <Long > source1 =
353+ new DataGeneratorSource <>(
354+ index -> index ,
355+ Long .MAX_VALUE ,
356+ RateLimiterStrategy .perSecond (5000 ),
357+ Types .LONG );
358+ DataStream <Long > sourceStream1 =
359+ env .fromSource (source1 , WatermarkStrategy .noWatermarks (), "Source 1" )
360+ .setParallelism (source1Parallelism );
361+
362+ int source2Parallelism = getRandomParallelism ();
363+ DataGeneratorSource <Long > source2 =
364+ new DataGeneratorSource <>(
365+ index -> index ,
366+ Long .MAX_VALUE ,
367+ RateLimiterStrategy .perSecond (5000 ),
368+ Types .LONG );
369+
370+ // Filter all records to simulate empty state exchange
371+ DataStream <Long > sourceStream2 =
372+ env .fromSource (source2 , WatermarkStrategy .noWatermarks (), "Source 2" )
373+ .setParallelism (source2Parallelism )
374+ .filter (value -> false )
375+ .setParallelism (source2Parallelism );
376+
377+ sourceStream1
378+ .union (sourceStream2 )
379+ .keyBy ((KeySelector <Long , Long >) value -> value )
380+ .map (
381+ x -> {
382+ Thread .sleep (5 );
383+ return x ;
384+ })
385+ .name ("MapAfterKeyBy" )
386+ .setParallelism (getRandomParallelism ());
387+
388+ return env .executeAsync ();
389+ }
390+
339391 private static int getRandomParallelism () {
340392 return RANDOM .nextInt (MAX_SLOTS ) + 1 ;
341393 }
0 commit comments