1
1
package com .codenotfound .kafka .consumer ;
2
2
3
3
import static org .assertj .core .api .Assertions .assertThat ;
4
-
5
4
import java .util .Map ;
6
5
import java .util .concurrent .TimeUnit ;
7
-
8
6
import org .junit .Before ;
9
7
import org .junit .ClassRule ;
10
8
import org .junit .Test ;
18
16
import org .springframework .kafka .core .KafkaTemplate ;
19
17
import org .springframework .kafka .core .ProducerFactory ;
20
18
import org .springframework .kafka .listener .MessageListenerContainer ;
21
- import org .springframework .kafka .test .rule .KafkaEmbedded ;
19
+ import org .springframework .kafka .test .rule .EmbeddedKafkaRule ;
22
20
import org .springframework .kafka .test .utils .ContainerTestUtils ;
23
21
import org .springframework .kafka .test .utils .KafkaTestUtils ;
24
22
import org .springframework .test .annotation .DirtiesContext ;
29
27
@ DirtiesContext
30
28
public class SpringKafkaReceiverTest {
31
29
32
- private static final Logger LOGGER = LoggerFactory .getLogger (SpringKafkaReceiverTest .class );
30
+ private static final Logger LOGGER =
31
+ LoggerFactory .getLogger (SpringKafkaReceiverTest .class );
33
32
34
33
private static String RECEIVER_TOPIC = "receiver.t" ;
35
34
@@ -42,17 +41,20 @@ public class SpringKafkaReceiverTest {
42
41
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry ;
43
42
44
43
@ ClassRule
45
- public static KafkaEmbedded embeddedKafka = new KafkaEmbedded (1 , true , RECEIVER_TOPIC );
44
+ public static EmbeddedKafkaRule embeddedKafka =
45
+ new EmbeddedKafkaRule (1 , true , RECEIVER_TOPIC );
46
46
47
47
@ Before
48
48
public void setUp () throws Exception {
49
49
// set up the Kafka producer properties
50
50
Map <String , Object > senderProperties =
51
- KafkaTestUtils .senderProps (embeddedKafka .getBrokersAsString ());
51
+ KafkaTestUtils .senderProps (
52
+ embeddedKafka .getEmbeddedKafka ().getBrokersAsString ());
52
53
53
54
// create a Kafka producer factory
54
55
ProducerFactory <String , String > producerFactory =
55
- new DefaultKafkaProducerFactory <String , String >(senderProperties );
56
+ new DefaultKafkaProducerFactory <String , String >(
57
+ senderProperties );
56
58
57
59
// create a Kafka template
58
60
template = new KafkaTemplate <>(producerFactory );
@@ -63,7 +65,7 @@ public void setUp() throws Exception {
63
65
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry
64
66
.getListenerContainers ()) {
65
67
ContainerTestUtils .waitForAssignment (messageListenerContainer ,
66
- embeddedKafka .getPartitionsPerTopic ());
68
+ embeddedKafka .getEmbeddedKafka (). getPartitionsPerTopic ());
67
69
}
68
70
}
69
71
0 commit comments