10
10
import com .devshawn .kafka .gitops .domain .state .DesiredState ;
11
11
import com .devshawn .kafka .gitops .domain .state .TopicDetails ;
12
12
import com .devshawn .kafka .gitops .enums .PlanAction ;
13
+ import com .devshawn .kafka .gitops .exception .PlanIsUpToDateException ;
13
14
import com .devshawn .kafka .gitops .exception .ReadPlanInputException ;
14
15
import com .devshawn .kafka .gitops .exception .WritePlanOutputException ;
15
16
import com .devshawn .kafka .gitops .service .KafkaService ;
16
- import com .devshawn .kafka .gitops .util .LogUtil ;
17
17
import com .devshawn .kafka .gitops .util .PlanUtil ;
18
18
import com .fasterxml .jackson .databind .ObjectMapper ;
19
+ import org .apache .kafka .clients .admin .Config ;
19
20
import org .apache .kafka .clients .admin .ConfigEntry ;
20
- import org .apache .kafka .clients .admin .TopicDescription ;
21
21
import org .apache .kafka .clients .admin .TopicListing ;
22
22
import org .apache .kafka .common .acl .AclBinding ;
23
+ import org .apache .kafka .common .config .ConfigResource ;
23
24
import org .slf4j .LoggerFactory ;
24
25
25
26
import java .io .FileNotFoundException ;
26
27
import java .io .FileWriter ;
27
28
import java .io .IOException ;
29
+ import java .util .ArrayList ;
28
30
import java .util .HashMap ;
29
31
import java .util .List ;
30
32
import java .util .Map ;
@@ -46,21 +48,21 @@ public PlanManager(ManagerConfig managerConfig, KafkaService kafkaService, Objec
46
48
47
49
public void planTopics (DesiredState desiredState , DesiredPlan .Builder desiredPlan ) {
48
50
List <TopicListing > topics = kafkaService .getTopics ();
51
+ List <String > topicNames = topics .stream ().map (TopicListing ::name ).collect (Collectors .toList ());
52
+ Map <String , List <ConfigEntry >> topicConfigs = fetchTopicConfigurations (topicNames );
49
53
50
54
desiredState .getTopics ().forEach ((key , value ) -> {
51
55
TopicPlan .Builder topicPlan = new TopicPlan .Builder ()
52
56
.setName (key )
53
57
.setTopicDetails (value );
54
58
55
- TopicDescription topicDescription = kafkaService .describeTopic (key );
56
-
57
- if (topicDescription == null ) {
59
+ if (!topicNames .contains (key )) {
58
60
log .info ("[PLAN] Topic {} does not exist; it will be created." , key );
59
61
topicPlan .setAction (PlanAction .ADD );
60
62
} else {
61
63
log .info ("[PLAN] Topic {} exists, it will not be created." , key );
62
64
topicPlan .setAction (PlanAction .NO_CHANGE );
63
- planTopicConfigurations (key , value , topicPlan );
65
+ planTopicConfigurations (key , value , topicConfigs . get ( key ), topicPlan );
64
66
}
65
67
66
68
desiredPlan .addTopicPlans (topicPlan .build ());
@@ -84,10 +86,9 @@ public void planTopics(DesiredState desiredState, DesiredPlan.Builder desiredPla
84
86
});
85
87
}
86
88
87
- private void planTopicConfigurations (String topicName , TopicDetails topicDetails , TopicPlan .Builder topicPlan ) {
89
+ private void planTopicConfigurations (String topicName , TopicDetails topicDetails , List < ConfigEntry > configs , TopicPlan .Builder topicPlan ) {
88
90
Map <String , TopicConfigPlan > configPlans = new HashMap <>();
89
- List <ConfigEntry > currentConfigs = kafkaService .describeTopicConfigs (topicName );
90
- List <ConfigEntry > customConfigs = currentConfigs .stream ()
91
+ List <ConfigEntry > customConfigs = configs .stream ()
91
92
.filter (it -> it .source () == ConfigEntry .ConfigSource .DYNAMIC_TOPIC_CONFIG )
92
93
.collect (Collectors .toList ());
93
94
@@ -175,8 +176,7 @@ public void planAcls(DesiredState desiredState, DesiredPlan.Builder desiredPlan)
175
176
public void validatePlanHasChanges (DesiredPlan desiredPlan , boolean deleteDisabled ) {
176
177
PlanOverview planOverview = PlanUtil .getOverview (desiredPlan , deleteDisabled );
177
178
if (planOverview .getAdd () == 0 && planOverview .getUpdate () == 0 && planOverview .getRemove () == 0 ) {
178
- LogUtil .printNoChangesMessage ();
179
- System .exit (0 );
179
+ throw new PlanIsUpToDateException ();
180
180
}
181
181
}
182
182
@@ -206,4 +206,11 @@ public void writePlanToFile(DesiredPlan desiredPlan) {
206
206
}
207
207
}
208
208
}
209
+
210
+ private Map <String , List <ConfigEntry >> fetchTopicConfigurations (List <String > topicNames ) {
211
+ Map <String , List <ConfigEntry >> map = new HashMap <>();
212
+ Map <ConfigResource , Config > configs = kafkaService .describeConfigsForTopics (topicNames );
213
+ configs .forEach ((key , value ) -> map .put (key .name (), new ArrayList <ConfigEntry >(value .entries ())));
214
+ return map ;
215
+ }
209
216
}
0 commit comments