|
2 | 2 |
|
3 | 3 | import com.devshawn.kafka.gitops.exception.MissingConfigurationException;
|
4 | 4 | import org.apache.kafka.clients.CommonClientConfigs;
|
| 5 | +import org.apache.kafka.common.config.SaslConfigs; |
5 | 6 | import org.slf4j.LoggerFactory;
|
6 | 7 |
|
7 | 8 | import java.util.HashMap;
|
@@ -56,22 +57,19 @@ private static void handleDefaultConfig(Map<String, Object> config) {
|
56 | 57 |
|
57 | 58 | private static void handleAuthentication(AtomicReference<String> username, AtomicReference<String> password, Map<String, Object> config) {
|
58 | 59 | if (username.get() != null && password.get() != null) {
|
59 |
| - |
60 | 60 | // Do we need the Plain or SCRAM module?
|
61 | 61 | String loginModule = null;
|
62 |
| - if (config.get("sasl.mechanism").equals("PLAIN")) { |
| 62 | + if (config.get(SaslConfigs.SASL_MECHANISM).equals("PLAIN")) { |
63 | 63 | loginModule = "org.apache.kafka.common.security.plain.PlainLoginModule";
|
64 |
| - } |
65 |
| - else if (config.get("sasl.mechanism").equals("SCRAM-SHA-256")) { |
| 64 | + } else if (config.get(SaslConfigs.SASL_MECHANISM).equals("SCRAM-SHA-256")) { |
66 | 65 | loginModule = "org.apache.kafka.common.security.scram.ScramLoginModule";
|
67 |
| - } |
68 |
| - else { |
69 |
| - throw new MissingConfigurationException("KAFKA_SASL_MECHANISM"); |
| 66 | + } else { |
| 67 | + throw new MissingConfigurationException("KAFKA_SASL_MECHANISM"); |
70 | 68 | }
|
71 | 69 |
|
72 | 70 | String value = String.format("%s required username=\"%s\" password=\"%s\";",
|
73 | 71 | loginModule, username.get(), password.get());
|
74 |
| - config.put("sasl.jaas.config", value); |
| 72 | + config.put(SaslConfigs.SASL_JAAS_CONFIG, value); |
75 | 73 | } else if (username.get() != null) {
|
76 | 74 | throw new MissingConfigurationException("KAFKA_SASL_JAAS_PASSWORD");
|
77 | 75 | } else if (password.get() != null) {
|
|
0 commit comments