Security is crucial to today's businesses because of cyber attacks and data breaches. Kafka did not support built-in security before version 0.9. Although it was possible to lock down access at the network level, that is not recommended for a large, shared, multi-tenant cluster.
There are different ways to secure a Kafka cluster. Let's see how to secure a Kafka cluster using Transport Layer Security (TLS) authentication.
For client/broker and inter-broker communication, do the following:
- Use TLS or Kerberos authentication.
- Encrypt network traffic via TLS.
- Perform authorization via access control lists (ACLs).
The sections below explain how to apply TLS authentication to Kafka brokers, producers, and consumers.
Before you begin, generate a key and certificate for each broker and client in the cluster. The common name (CN) of the broker certificate must match the fully qualified domain name (FQDN) of the server because the client compares the CN with the DNS's domain name to ensure that it is connecting to the desired broker instead of a malicious one.
Now that each broker has a public-private key pair and an unsigned certificate to identify itself, it is important for each certificate to be signed by a certificate authority (CA) to prevent forged certificates. As long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to authentic brokers.
In contrast to the keystore, which stores each application’s identity, the truststore stores all the certificates that the application should trust. Importing a certificate into a truststore also means trusting all certificates that are signed by that certificate. This attribute is called the chain of trust, and it is particularly useful when deploying TLS on a large Kafka cluster. You can sign all certificates in the cluster with a single CA and have all the machines share the same truststore that contains the CA certificate. That way, all machines can authenticate all other machines. A slightly more complex alternative is to use two CAs, one to sign brokers’ keys and another to sign clients’ keys.
Let's see how you can generate your own CA, which is simply a public-private key pair and certificate. Then you can add the CA certificate to each client and broker’s truststore.
The following bash script generates the keystore and truststore for brokers (kafka.server.keystore.jks and kafka.server.truststore.jks) and clients (kafka.client.keystore.jks and kafka.client.truststore.jks):
createCertificates
#!/bin/bash
PASSWORD=test1234
VALIDITY=365
keytool -keystore kafka.server.keystore.jks -alias localhost -validity $VALIDITY -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl
x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed
-days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
keytool -keystore kafka.client.keystore.jks -alias localhost -validity $VALIDITY -genkey
keytool -keystore kafka.client.keystore.jks -alias localhost -certreq -file cert-file
openssl
x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed
-days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client.keystore.jks -alias localhost -import -file cert-signed
-
Configure the required security protocols and ports in the <KAFKA_HOME>/config/server.properties file.
listeners=SSL://:9093
Note: Do not enable an unsecured (PLAINTEXT) port. Ensure that all broker/client and inter-broker network communication is encrypted. You can select SSL as the security protocol for inter-broker communication. SASL_SSL is the other possible option for the configured listeners.:
security.inter.broker.protocol=SSL
Info: It is difficult to simultaneously upgrade all systems to the new secure clients. Therefore, you can allow supporting a mix of secured and unsecured clients by adding a PLAINTEXT port to listeners. Be sure to restrict access to this port to trusted clients only. Network segmentation and/or authorization ACLs can be used to restrict access to trusted IPs in such cases.
Now, let's see how to apply protocol-specific configuration settings.
- Configure the following in the /config/server.properties file:
TLS configuration in Broker:
ssl.client.auth=required
ssl.keystore.location={file-path}/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location={file-path}/kafka.server.truststore.jks
ssl.truststore.password=test1234
The above configuration should have the TLS client authentication and configuration key details as well as keystore and truststore details. Since you need to store passwords in the broker configuration, it is important to restrict access to the broker configuration via filesystem permission.
You enable TLS authentication for Kafka producers and consumers by configuring a set of parameters. It does not require any code changes.
Kafka versions 0.9.0.0 and above support TLS. The older APIs do not support TLS.
The parameters you specify to support TLS is the same for both producers and consumers. You should specify the security protocol as well as the truststore and keystore information since you are using mutual authentication:
The client configuration can slightly differ depending on whether you want the client to use TLS or SASL/Kerberos.
- Use the following configuration to create a producer that sends messages to the broker. Proxy with Kafka Security:
<proxy xmlns="http://ws.apache.org/ns/synapse"
name="testKafka"
startOnLoad="true"
statistics="disable"
trace="disable"
transports="http,https">
<target>
<inSequence>
<kafkaTransport.init>
<bootstrapServers>localhost:9093</bootstrapServers>
<keySerializerClass>org.apache.kafka.common.serialization.StringSerializer</keySerializerClass>
<valueSerializerClass>org.apache.kafka.common.serialization.StringSerializer</valueSerializerClass>
<securityProtocol>SSL</securityProtocol>
<sslTruststoreLocation>{file-path}/kafka.client.truststore.jks</sslTruststoreLocation>
<sslTruststorePassword>test1234</sslTruststorePassword>
<sslKeystoreLocation>{file-path}/kafka.client.keystore.jks</sslKeystoreLocation>
<sslKeystorePassword>test1234</sslKeystorePassword>
<sslKeyPassword>test1234</sslKeyPassword>
</kafkaTransport.init>
<kafkaTransport.publishMessages>
<topic>test</topic>
</kafkaTransport.publishMessages>
</inSequence>
</target>
<description/>
</proxy>
The console producer is a convenient way to send a small amount of data to the broker.
-
Follow the sample scenario in Publishing Messages using Kafka_2.12-0.11.0.0 and send the following message to the Kafka broker:
{"test":"wso2"} {"test":"wso2"} {"test":"wso2"}
Note: Be sure to include the following configuration in the proxy service when you are building the sample :
bootstrap.servers=localhost:9093 security.protocol=SSL ssl.truststore.location={file-path}/kafka.client.truststore.jks ssl.truststore.password=test1234 ssl.keystore.location={file-path}/kafka.client.keystore.jks ssl.keystore.password=test1234 ssl.key.password=test1234
The console consumer is a convenient way to consume messages. You can either use the console consumer or the Kafka inbound endpoint to consume messages.
-
Execute the following command using the terminal to start the consumer with security:
bin/kafka-console-consumer --bootstrap-server localhost:9093 --topic test --new-consumer --from-beginning --consumer.config {file-path}/consumer_ssl.properties
Include the following configuration in consumer_ssl.properties to enable security:
bootstrap.servers=localhost:9093
security.protocol=SSL
ssl.truststore.location={file-path}/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location={file-path}/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
You have now applied TLS authentication to Kafka brokers, producers and consumers.
You see the following output on the consumer console:
{"test":"wso2"}
{"test":"wso2"}
{"test":"wso2"}