Apache Kafka Security 101

TLS, Kerberos, SASL, and Authorizer in Apache Kafka 0.9 - Enabling New Encryption, Authorization, and Authentication Features

Apache Kafka is frequently used to store critical data making it one of the most important components of a company's data infrastructure. Our goal is to make it possible to run Kafka as a central platform for streaming data, supporting anything from a single app to a whole company. Multi-tenancy is an essential requirement in achieving this vision and, in turn, security features are crucial for multi-tenancy.

Previous to 0.9, Kafka had no built-in security features. One could lock down access at the network level but this is not viable for a big shared multi-tenant cluster being used across a large company. Consequently securing Kafka has been one of the most requested features. Security is of particular importance in today's world where cyber-attacks are a common occurrence and the threat of data breaches is a reality for businesses of all sizes, and at all levels from individual users to whole government entities. 

Four key security features were added in Apache Kafka 0.9, which is included in the Confluent Platform 2.0:

  1. Administrators can require client authentication using either Kerberos or Transport Layer Security (TLS) client certificates, so that Kafka brokers know who is making each request

  2. A Unix-like permissions system can be used to control which users can access which data.

  3. Network communication can be encrypted, allowing messages to be securely sent across untrusted networks.

  4. Administrators can require authentication for communication between Kafka brokers and ZooKeeper.

In this post, we will discuss how to secure Kafka using these features. For simplicity, we will assume a brand new cluster; the Confluent documentation describes how to enable security features on a running Kafka cluster. With regards to clients, we will focus on the console and Java clients (a future blog post will cover librdkafka, the C client we maintain).

It’s worth noting that the security features were implemented in a backwards-compatible manner and are disabled by default. In addition, only the new java clients (and librdkafka) have been augmented with support for security. For the most part, enabling security is simply a matter of configuration and no code changes are required.

Defining the Solution

There are a number of different ways to secure a Kafka cluster depending on one’s requirements. In this post, we will show one possible approach, butConfluent's Kafka Security documentation describes the various options in more detail.

For client/broker and inter-broker communication, we will:

  • Require TLS or Kerberos authentication
  • Encrypt network traffic via TLS
  • Perform authorization via access control lists (ACLs)

For broker/ZooKeeper communication, we will only require Kerberos authentication as TLS is only supported in ZooKeeper 3.5, which is still at the alpha release stage.

Network segmentation should be used to restrict access to ZooKeeper. Depending on performance and security requirements, Kafka brokers could be accessible internally, exposed to the public internet or via a proxy (in some environments, public internet traffic must Go through two separate security stacks in order to make it harder for attackers to exploit bugs in a particular security stack). A simple example can be seen in the following diagram:

Before we start

First a note on terminology. Secure Sockets Layer (SSL) is the predecessor of TLS and it has been deprecated since June 2015. However, for historical reasons, Kafka (like Java) uses the term SSL instead of TLS in configuration and code, which can be a bit confusing. We will stick to TLS in this document.

Before we start, we need to generate the TLS keys and certificates, create the Kerberos principals and potentially configure the Java Development Kit (JDK) so that it supports stronger encryption algorithms.

TLS Keys and Certificates

We need to generate a key and certificate for each broker and client in the cluster. The common name (CN) of the broker certificate must match the fully qualified domain name (FQDN) of the server as the client compares the CN with the DNS domain name to ensure that it is connecting to the desired broker (instead of a malicious one).

At this point, each broker has a public-private key pair and an unsigned certificate to identify itself. To prevent forged certificates, it is important for each certificate to be signed by a certificate authority (CA). As long as the CA is a genuine and trusted authority, the clients have high assurance that they are connecting to authentic brokers.

In contrast to the keystore, which stores each application’s identity, the truststore stores all the certificates that the application should trust. Importing a certificate into one’s truststore also means trusting all certificates that are signed by that certificate. This attribute is called the chain of trust, and it is particularly useful when deploying TLS on a large Kafka cluster. You can sign all certificates in the cluster with a single CA, and have all machines share the same truststore that contains the CA certificate. That way all machines can authenticate all other machines. A slightly more complex alternative is to use two CAs, one to sign brokers’ keys and another to sign clients’ keys.

For this exercise, we will generate our own CA, which is simply a public-private key pair and certificate and we will add the same CA certificate to each client and broker’s truststore.

The following bash script generates the keystore and truststore for brokers (kafka.server.keystore.jks and kafka.server.truststore.jks) and clients (kafka.client.keystore.jks and kafka.client.truststore.jks):

#!/bin/bash
PASSWORD=test1234
VALIDITY=365
keytool -keystore kafka.server.keystore.jks -alias localhost -validity $VALIDITY -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
keytool -keystore kafka.client.keystore.jks -alias localhost -validity $VALIDITY -genkey
keytool -keystore kafka.client.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $VALIDITY -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client.keystore.jks -alias localhost -import -file cert-signed

Kerberos

If your organization is already using a Kerberos server, it can also be used for Kafka. Otherwise you will need to install one. Your Linux vendor likely has packages for Kerberos and a guide on how to install and configure it (e.g. Ubuntu,Redhat).

If you are using the organization’s Kerberos server, ask your Kerberos administrator for a principal for each Kafka broker in your cluster and for every operating system user that will access Kafka with Kerberos authentication (via clients or tools). 

If you have installed your own Kerberos, you will need to create these principals yourself using the following commands:

sudo /usr/sbin/kadmin.local -q 'addprinc -randkey kafka/{hostname}@{REALM}'
sudo /usr/sbin/kadmin.local -q "ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka/{hostname}@{REALM}"

It is a Kerberos requirement that all your hosts can be resolved by their FQDNs.

Stronger Encryption

Due to import regulations in some countries, the Oracle implementation of Java limits the strength of cryptographic algorithms available by default. If stronger algorithms are needed (for example, AES with 256-bit keys), the JCE Unlimited Strength Jurisdiction Policy Files must be obtained and installed in the JDK/JRE. This affects both TLS and SASL/Kerberos, see the JCA Providers Documentationfor more information.

Configuring the ZooKeeper ensemble

The ZooKeeper server configuration is relatively straightforward. We enable Kerberos authentication via the Simple Authentication and Security Layer (SASL). In order to do that, we set the authentication provider, require sasl authentication and configure the login renewal period in zookeeper.properties:

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

We also need to configure Kerberos via the zookeeper_jaas.conf file as follows:

Server {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/path/to/server/keytab"
    storeKey=true
    useTicketCache=false
    principal="zookeeper/yourzkhostname";
};

Finally, we need to pass the path to the JAAS file when starting the ZooKeeper server:

-Djava.security.auth.login.config=/path/to/server/jaas/file.conf

Configuring Kafka brokers

We start by configuring the desired security protocols and ports inserver.properties:

listeners=SSL://:9093,SASL_SSL://:9094

We have not enabled an unsecured (PLAINTEXT) port as we want to ensure that all broker/client and inter-broker network communication is encrypted. We choose SSL as the security protocol for inter-broker communication (SASL_SSLis the other possible option given the configured listeners):

security.inter.broker.protocol=SSL

We know that it is difficult to simultaneously upgrade all systems to the new secure clients, so we allow administrators to support a mix of secure and unsecured clients. This can be done by adding a PLAINTEXT port to listeners, but care has to be taken to restrict access to this port to trusted clients only. Network segmentation and/or authorization ACLs can be used to restrict access to trusted IPs in such cases. We will not cover this in more detail as we will not enable aPLAINTEXT port in our example.

We’ll now go over protocol-specific configuration settings.

TLS

We require TLS client authentication and configure key, keystore and truststore details:

ssl.client.auth=required
ssl.keystore.location=/var/private/ssl/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/var/private/ssl/kafka.server.truststore.jks
ssl.truststore.password=test1234

Since we are storing passwords in the broker config, it is important to restrict access via filesystem permissions.

SASL/Kerberos

We will enable SASL/Kerberos for broker/client and broker/ZooKeeper communication.

Most of the configuration for SASL lives in JAAS configuration files containing aKafkaServer section for authentication between client and broker and a Clientsection for authentication between broker and zookeeper:

KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};

// Zookeeper client authentication
Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_server.keytab"
    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";
};

Note that each broker should have its own keytab and the same primary name should be used across all brokers. In the above example, the principal iskafka/kafka1.hostname.com@EXAMPLE.com and the primary name is kafka. The keytabs configured in the JAAS file must be readable by the operating system user who is starting the Kafka broker.

We must also configure the service name in server.properties with the primary name of the Kafka brokers:

sasl.kerberos.service.name=kafka

When it comes to ZooKeeper authentication, if we set the configuration property zookeeper.set.acl in each broker to true, the metadata stored in ZooKeeper is such that only brokers will be able to modify the corresponding znodes, but znodes are world readable. The rationale behind this decision is that the data stored in ZooKeeper is not sensitive, but inappropriate manipulation of znodes can cause cluster disruption.

Authorization and ACLs

Kafka ships with a pluggable Authorizer and an out-of-box authorizer implementation that uses ZooKeeper to store all the ACLs.

Kafka ACLs are defined in the general format of “Principal P is [Allowed/Denied] Operation O From Host H On Resource R". The operations available are both for clients (producers, consumers, admin) and inter-broker operations of a cluster. In a secure cluster, both client requests and inter-broker operations require authorization.

We enable the default authorizer by setting the following in server.properties:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

The default behavior is such that if a resource has no associated ACLs, then no one is allowed to access the resource, except super users. Setting broker principals as super users is a convenient way to give them the required access to perform inter-broker operations:

super.users=User:Bob;User:Alice

By default, the TLS user name will be of the form “CN=host1.example.com,OU=,O=Confluent,L=London,ST=London,C=GB". One can change that by setting a customized PrincipalBuilder in server.properties like the following:

principal.builder.class=CustomizedPrincipalBuilderClass

By default, the SASL user name will be the primary part of the Kerberos principal. One can change that by setting sasl.kerberos.principal.to.local.rules to a customized rule in server.properties.

We can use kafka-acls (the Kafka Authorizer CLI) to add, remove or list ACLs. Please run kafka-acls --help for detailed information on the supported options.

The most common use cases for ACL management are adding/removing a principal as a producer or consumer and there are convenience options to handle these cases. In order to add User:Bob as a producer of Test-topic we can execute the following:

kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
  --add --allow-principal User:Bob \
  --producer --topic Test-topic

Similarly to add Alice as a consumer of Test-topic with consumer group Group-1 we specify the --consumer and --group options:

kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \
  --add --allow-principal User:Bob \
  --consumer --topic test-topic --group Group-1

Configuring Kafka Clients

TLS is only supported by new Kafka Producer and Consumer, the older APIs are not supported. Enabling security is simply a matter of configuration, no code changes are required.

TLS

The configs for TLS will be the same for both producer and consumer. We have to set the desired security protocol as well as the truststore and keystore information since we are using mutual authentication:

security.protocol=SSL
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

Given that passwords are being stored in the client config, it is important to restrict access to the file via filesystem permissions.

SASL/Kerberos

Clients (producers, consumers, connect workers, etc) will authenticate to the cluster with their own principal (usually with the same name as the user running the client), so we need to obtain or create these principals as needed. Then we create a JAAS file for each principal. The KafkaClient section describes how the clients like producer and consumer can connect to the Kafka Broker. The following is an example configuration for a client using a keytab (recommended for long-running processes):

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka_client.keytab"
    principal="kafka-client-1@EXAMPLE.COM";
};

For command-line utilities like kafka-console-consumer or kafka-console-producer, kinit can be used along with useTicketCache=true as in:

KafkaClient {
    com.sun.security.auth.module.Krb5LoginModule required
    useTicketCache=true;
};

The security protocol and service name are set in producer.properties and/orconsumer.properties. We also have to include the truststore details since we are using SASL_SSL instead of SASL_PLAINTEXT:

security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
ssl.truststore.password=test1234
# keystore configuration should not be needed, see note below
ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

The keystore configuration is needed in 0.9.0.0 due to a bug in Kafka, which has been fixed and will be included in 0.9.0.1.

Finally, we pass the name of the JAAS file as a JVM parameter to the client JVM:

-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf

The keytabs configured in the kafka_client_jaas.conf must be readable by the operating system user who is starting kafka client.

Putting It All Together

We have created a Vagrant setup based on Centos 7.2 that includes a Kerberos server, Kafka and OpenJDK 1.8.0 to make it easier to test all the pieces together. Please install Vagrant and VirtualBox (if you haven’t already) and then:

  1. Clone the Git repository: git clonehttps://github.com/confluentinc/securing-kafka-blog

  2. Change directory: cd securing-kafka-blog

  3. Start and provision the vagrant environment: vagrant up

  4. Connect to the VM via SSH: vagrant ssh

ZooKeeper and Broker

Before we start ZooKeeper and the Kafka broker, let’s take a look at their config files:

/etc/kafka/zookeeper.properties
dataDir=/var/lib/zookeeper
clientPort=2181
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

/etc/kafka/zookeeper_jaas.conf
Server {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    keyTab="/etc/security/keytabs/zookeeper.keytab"
    storeKey=true
    useTicketCache=false
    principal="zookeeper/kafka.example.com@EXAMPLE.COM";
};

/etc/kafka/server.properties
broker.id=0
listeners=SSL://:9093,SASL_SSL://:9095
security.inter.broker.protocol=SSL
zookeeper.connect=kafka.example.com:2181
log.dirs=/var/lib/kafka
zookeeper.set.acl=true
ssl.client.auth=required
ssl.keystore.location=/etc/security/tls/kafka.server.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
ssl.truststore.location=/etc/security/tls/kafka.server.truststore.jks
ssl.truststore.password=test1234
sasl.kerberos.service.name=kafka
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:CN=kafka.example.com,OU=,O=Confluent,L=London,ST=London,C=GB

/etc/kafka/kafka_server_jaas.conf
KafkaServer {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka.keytab"
    principal="kafka/kafka.example.com@EXAMPLE.COM";
};

Client {
    com.sun.security.auth.module.Krb5LoginModule required
    useKeyTab=true
    storeKey=true
    keyTab="/etc/security/keytabs/kafka.keytab"
    principal="kafka/kafka.example.com@EXAMPLE.COM";
};

And the start script (/usr/sbin/start-zk-and-kafka) looks like:

export KAFKA_HEAP_OPTS='-Xmx256M'
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf'
/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties &
sleep 5
export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf'
/usr/bin/kafka-server-start /etc/kafka/server.properties &

After executing executing the script (sudo /usr/sbin/start-zk-and-kafka), the following should be in the server.log:

INFO Registered broker 0 at path /brokers/ids/0 with addresses: SSL -> EndPoint(kafka.example.com,9093,SSL),SASL_SSL -> EndPoint(kafka.example.com,9095,SASL_SSL)

Create topic

Because we configured ZooKeeper to require SASL authentication, we need to set the java.security.auth.login.config system property while starting the kafka-topics tool:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
kafka-topics --create --topic securing-kafka --replication-factor 1 --partitions 3 --zookeeper kafka.example.com:2181

We used the server principal and keytab for this example, but you may want to create a separate principal and keytab for tools such as this.

Set ACLs

We authorize the clients to access the newly created topic:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"
kafka-acls --authorizer-properties zookeeper.connect=kafka.example.com:2181 \
  --add --allow-principal User:kafkaclient \
  --producer --topic securing-kafka

kafka-acls --authorizer-properties zookeeper.connect=kafka.example.com:2181 \
  --add --allow-principal User:kafkaclient \
  --consumer --topic securing-kafka --group securing-kafka-group

Console Clients

The client configuration is slightly different depending on whether we want the client to use TLS or SASL/Kerberos.

TLS

The console producer is a convenient way to send a small amount of data to the broker:

kafka-console-producer --broker-list kafka.example.com:9093 --topic securing-kafka --producer.config /etc/kafka/producer_ssl.properties
message1
message2
message3

producer_ssl.properties
bootstrap.servers=kafka.example.com:9093
security.protocol=SSL
ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/etc/security/tls/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

The console consumer is a convenient way to consume messages:

kafka-console-consumer --bootstrap-server kafka.example.com:9093 --topic securing-kafka --new-consumer --from-beginning --consumer.config /etc/kafka/consumer_ssl.properties

consumer_ssl.properties
bootstrap.servers=kafka.example.com:9093
group.id=securing-kafka-group
security.protocol=SSL
ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/etc/security/tls/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

The output should be:

message1
message2
message3

SASL/Kerberos

Using the console clients via SASL/Kerberos is similar, but we also need to pass the location of the JAAS file via a system property. An example for the console consumer follows:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"
kafka-console-consumer --bootstrap-server kafka.example.com:9095 --topic securing-kafka --new-consumer --from-beginning --consumer.config /etc/kafka/consumer_sasl.properties

consumer_sasl.properties
bootstrap.servers=kafka.example.com:9095
group.id=securing-kafka-group
security.protocol=SASL_SSL
sasl.kerberos.service.name=kafka
ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/etc/security/tls/kafka.client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234

Java Clients

Configuring the new Java clients (both producer and consumer) is simply a matter of passing the configs used in the "Console Clients" example above to the relevant client constructor. This can be done by either loading the properties from a file or by setting the properties programmatically. An example of setting the properties programmatically for a consumer configured to use TLS follows:

Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "securing-kafka-group");
props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.truststore.jks");
props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");
props.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "/etc/security/tls/kafka.client.keystore.jks");
props.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "test1234");
props.setProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
new KafkaConsumer(props);

Debugging problems

It is a common occurrence that things don't work on the first attempt when it comes to configuring security systems. Debugging output can be quite helpful in order to diagnose the cause of the problem:

  1. SSL debug output can be enabled via the the javax.NET.debug system property, eg export KAFKA_OPTS=-Djavax.net.debug=all

  2. SASL debug output can be enabled via the sun.security.krb5.debugsystem property, eg export KAFKA_OPTS=-Dsun.security.krb5.debug=true

  3. Kafka authentication logging can be enabled by changing WARN to DEBUG in the following line of the log4j.properties file included in the Kafka distribution (in /etc/kafka/log4j.properties in the Confluent Platform):

    log4j.logger.kafka.authorizer.logger=WARN, authorizerAppender

Conclusion

We have shown how the security features introduced in Apache Kafka 0.9 (part of Confluent Platform 2.0) can be used to secure a Kafka cluster. We focused on ensuring that authentication was required for all network communication and network encryption was applied to all broker/client and inter-broker network traffic.

These security features were a very good first step, but we will be making them better, faster and simpler to operate in subsequent releases. As ever, we would love to hear your feedback in the Kafka mailing list or Confluent Platform Google Group.

Acknowledgements

We would like to thank Ben Stopford, Flavio Junqueira, Gwen Shapira, Joel Koshy, Jun Rao, Parth Brahmbhatt, Rajini Sivaram, and Sriharsha Chintalapani for their contributions to the security features introduced in Apache Kafka 0.9, and our colleagues at Confluent for feedback on the draft version of this post.

Apache Kafka 0.9中的TLSKerberosSASLAuthorizer - 启用新的加密,授权和身份验证功能

Apache Kafka经常用于存储关键数据,使其成为公司数据基础架构中最重要的组件之一。我们的目标是使Kafka成为流式传输数据的中心平台,支持从单一应用程序到整个公司的任何内容。多租户是实现这一愿景的基本要求,反过来,安全特征对于多租户至关重要。

 

Apache Kafka 0.9中增加了四个关键的安全功能,它包含在Confluent Platform 2.0中:

1.       管理员可以要求使用Kerberos或传输层安全(TLS)客户端证书进行客户端身份验证,以便kafkabrokers知道谁正在进行每个请求

2.       可以使用类Unix权限系统来控制哪些用户可以访问哪些数据。

3.       可以对网络通信进行加密,从而允许在不可信网络上安全地发送消息。

4.       管理员可以要求验证kafka brokersZooKeeper之间的沟通。

 

在这篇文章中,我们将讨论如何使用这些功能来保护kafka。为了简单起见,我们将假设一个全新的集群; Confluent文档描述了如何在正在运行的Kafka群集上启用安全功能。关于客户,我们将专注于控制台和Java客户端(未来的博文将覆盖librdkafka,我们维护的C客户端)。

 

值得注意的是,安全功能是以向后兼容的方式实现的,默认情况下禁用。此外,只有新的Java客户端(和librdkafka)已经被扩充并支持安全性。在大多数情况下,启用安全性仅仅是配置的问题,并且不需要更改代码。

 

定义解决方案

根据个人的要求,有许多不同的方法来保护kafka群集。在这篇文章中,我们将展示一种可能的方法,但是ConfluentKafka Security文档更详细地描述了各种选项。

对于客户/brokersbroker间的沟通,我们将:

1.       需要TLSKerberos身份验证

2.       通过TLS加密网络流量

3.       通过访问控制列表(ACL)执行授权

对于broker/ ZooKeeper通信,我们将仅需要Kerberos身份验证,因为仅在ZooKeeper 3.5中支持TLS,仍然在alpha版本阶段。

 

应该使用网络分割来限制对ZooKeeper的访问。根据性能和安全要求,kafka brokers可以在内部访问,暴露于公共互联网或通过broker(在某些环境中,公共互联网流量必须通过两个单独的安全堆栈,以使攻击者更难的利用某个特定的安全栈中的bug)。在下图中可以看到一个简单的例子:

 

Before We Start

首先是关于术语的说明。安全套接字层(SSL)是TLS的前身,自20156月起已被弃用,但由于历史原因,Kafka(如Java)在配置和代码中使用术语SSL而不是TLS,这可能会令人困惑。我们将坚持使用本文档中的TLS

在开始之前,我们需要生成TLS密钥和证书,创建Kerberosprincipal,并可能配置Java开发工具包(JDK),以支持更强大的加密算法。

 

TLS密钥和证书

我们需要为集群中的每个broker和客户端生成一个密钥和证书。broker证书的通用名称(CN)必须与服务器的完全限定域名(FQDN)匹配,因为客户端将CNDNS域名进行比较,以确保它正在连接到所需的broker(而不是恶意的broker)。

在这一点上,每个brokers都有一个公私密钥对和一个无符号的证书来识别自己。为了防止伪造证书,每个证书都必须由证书颁发机构(CA)签署。只要CA是一个true正和值得信赖的权威机构,客户就可以对他们正在连接到true实的brokers之时有很高的保证性,。

 

与存储每个应用程序的身份的密钥库相反,信任库存储应用程序应信任的所有证书。将证书导入自己的信任库还意味着信任该证书签名的所有证书。此属性称为信任链,当在大型Kafka群集上部署TLS时,此属性特别有用。您可以使用单个CA对集群中的所有证书进行签名,并使所有计算机共享包含CA证书的相同信任库。所有这些机器都可以验证所有其他机器。一个更复杂的替代方案是使用两个CA,一个用于签署brokers的密钥,另一个用于签署客户的密钥。

 

对于这个练习,我们将生成自己的CA,它只是一个公私密钥对和证书,我们将向每个客户端和broker的信任库添加相同的CA证书。

 

以下bash脚本为brokerkafka.server.keystore.jkskafka.server.truststore.jks)和客户端(kafka.client.keystore.jkskafka.client.truststore.jks)生成密钥库和信任库:

 

#! /bin/shell

PASSWORD = test1234

VALIDITY = 365

keytool -keystore kafka.server.keystore.jks -alias localhost -validity $ VALIDITY -genkey

openssl req -new -x509 -keyout ca-key -out ca-cert -days $ VALIDITY

keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert

keytool -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert

keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file

openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $ VALIDITY -CAcreateserial -passin pass$ PASSWORD

keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert

keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed

keytool -keystore kafka.client.keystore.jks -alias localhost -validity $ VALIDITY -genkey

keytool -keystore kafka.client.keystore.jks -alias localhost -certreq -file cert-file

openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days $ VALIDITY -CAcreateserial -passin pass$ PASSWORD

keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert

keytool -keystore kafka.client.keystore.jks -alias localhost -import -file cert-signed

 

Kerberos

如果您的组织已经在使用Kerberos服务器,那么它也可以用于Kafka。否则你将需要安装一个。您的Linux供应商可能会为Kerberos提供包,并提供有关如何安装和配置它的指南(例如UbuntuRedhat)。

 

如果您正在使用组织的Kerberos服务器,请向您的Kerberos管理员询问群集中每个Kafka brokerprincipal以及将通过Kerberos身份验证(通过客户端或工具)访问Kafka的每个操作系统用户。

 

如果您已经安装了自己的Kerberos,则需要使用以下命令自己创建这些principal

sudo /usr/sbin/kadmin.local -q'addprinc -randkey kafka / {hostname} @ {REALM}'

sudo /usr/sbin/kadmin.local -q“ktadd -k /etc/security/keytabs/{keytabname}.keytab kafka / {hostname} @ {REALM}”

所有主机都可以通过它们的FQDN来解决是一个Kerberos要求。

 

更强的加密

由于某些国家的进口法规,JavaOracle实施限制了默认情况下可用的加密算法的实力。如果需要更强大的算法(例如,具有256位密钥的AES),则必须在JDK / JRE中获取并安装JCE无限强度管理策略文件。这会影响TLSSASL / Kerberos,有关更多信息,请参阅JCA提供商文档。

 

Configuring the ZooKeeper ensemble

ZooKeeper服务器配置相对简单。我们通过简单认证和安全层(SASL)启用Kerberos身份验证。为了做到这一点,我们设置了认证提供商,需要sasl认证,并在zookeeper.properties中配置登录更新周期:

authProvider.1 = org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme = SASL

jaasLoginRenew = 3600000

我们还需要通过zookeeper_jaas.conf文件配置Kerberos,如下所示:

server {

    com.sun.security.auth.module.Krb5LoginModule  required

    useKeyTab =true

    keyTab= "/path/to/server/keytab"

    storeKey =true

    useTicketCache = FALSE

    principal= "zookeeper/yourzkhostname";

};

最后,我们需要在启动ZooKeeper服务器时将路径传递给JAAS文件:

-Djava.security.auth.login.config = /path/to/server/jaas/file.conf

 

配置Kafka brokers

我们首先配置所需的安全协议和端口insertionver.properties

listeners=SSL://:9093,SASL_SSL://:9094

我们没有启用不安全的(PLAINTEXT)端口,因为我们希望确保所有代理/客户端和代理间网络通信都被加密。我们选择SSL作为代理间通信的安全协议(SASL_SSL是给定配置的监听器的另一个可能选项):

security.inter.broker.protocol = SSL

 

我们知道很难将所有系统同时升级到新的安全客户端,因此我们允许管理员支持安全和不安全客户端的混合。这可以通过将PLAINTEXT端口添加到侦听器来完成,但是必须注意将该端口的访问限制为仅可信赖的客户端。在这种情况下,网络分段和/或授权ACL可用于限制对可信IP的访问。我们不会更详细地介绍这一点,因为我们不会在我们的示例中启用PLAINTEXT端口。

我们现在将讨论协议特定的配置设置。

TLS

我们需要TLS客户端身份验证,并配置密钥,密钥库和信任库详细信息:

ssl.client.auth = required

ssl.keystore.location =/ var / private / SSL / kafka.server.keystore.jks

ssl.keystore.password = test1234

ssl.key.password = test1234

ssl.truststore.location =/ var / private / SSL / kafka.server.truststore.jks

ssl.truststore.password = test1234

由于我们将密码存储在代理配置中,因此通过文件系统权限来限制访问非常重要。

 

SASL / Kerberos

我们将为broker/clientbroker/ZooKeeper通信启用SASL / Kerberos

SASL的大多数配置都存在于JAAS配置文件中,该配置文件包含用于broker/client之间验证的KafkaServer部分,以及brokerzookeeper之间的身份验证的客户端:

 

KafkaServer {

com.sun.security.auth.module.Krb5LoginModule  required

    useKeyTab =true

    storeKey =true

    keyTab="/etc/security/keytabs/kafka_server.keytab"

  principal="kafka/kafka1.hostname.com@EXAMPLE.COM";

};

 

// Zookeeper客户端认证

Client{

    com.sun.security.auth.module.Krb5LoginModule  required

    useKeyTab=true

    storeKey=true

    keyTab="/etc/security/keytabs/kafka_server.keytab"

    principal="kafka/kafka1.hostname.com@EXAMPLE.COM";

};

请注意,每个代理应该有自己的密钥表,所有broker都应使用相同的主名称。在上面的例子中,principalkafka/kafka1.hostname.com@EXAMPLE.com,主要名称是kafka。在JAAS文件中配置的keytab必须由启动Kafka代理的操作系统用户可读。

 

我们还必须在server.properties中使用Kafka broker的主名称配置服务名称:

sasl.kerberos.service.name =kafka

当涉及到ZooKeeper身份验证时,如果我们将每个代理中的配置属性zookeeper.set.acl设置为true,那么存储在ZooKeeper中的元数据就是这样,只有broker能够修改相应的znodes,但是znodes是世界可读的。这个决定背后的理由是存储在ZooKeeper中的数据并不敏感,但znode的不当操作可能导致群集中断。

 

授权和ACLs

Kafka附带可插拔的授权器和一个开箱即用的授权器实现,它使用ZooKeeper来存储所有的ACL

Kafka ACL以“主机P [允许/拒绝]操作O从主机H资源R”的一般格式定义。可用的操作既可用于客户端(生产者,消费者,管理员)和集群的代理间操作在安全集群中,客户端请求和代理间操作都需要授权。

我们通过在server.properties中设置以下内容来启用默认授权器:

authorizer.class.name = kafka.security.auth.SimpleAclAuthorizer

 

默认行为是如果一个资源没有关联的ACL,那么没有人被允许访问该资源,超级用户除外。将broker负责人设置为超级用户是一种方便的方式,为他们提供所需的访问来执行inter-broker 操作:super.users=User:Bob;User:Alice

默认情况下,TLS用户名的格式为“CN = host1.example.comOU =O = ConfluentL = LondonST = LondonC = GB”,可以通过设置自定义的PrincipalBuilderserver.properties中,如下所示:

principal.builder.class = CustomizedPrincipalBuilderClass

默认情况下,SASL用户名将是Kerberos principal的主要部分。可以通过将sasl.kerberos.principal.to.local.rules设置为server.properties中的自定义规则来进行更改。

 

我们可以使用kafka-aclsKafka Authorizer CLI)添加,删除或列出ACL。有关支持的选项的详细信息,请运行kafka-acls --help

 

ACL管理的最常见用例是添加/删除principal作为生产者或消费者,并且有便利选项来处理这些情况。为了添加UserBob作为Test-topic的生产者,我们可以执行以下操作:

kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \

  --add --allow-principal User:Bob \

  --producer --topic Test-topic

同样,添加Alice作为消费者组Group-1的消费者,我们指定--consumer--group选项:

kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 \

  --add --allow-principal User:Bob \

  --consumer --topic test-topic --group Group-1

 

Configuring Kafka Clients

TLS仅由新的Kafka ProducerConsumer支持,不支持旧的API。启用安全性只是一个配置问题,不需要更改代码。

 

TLS

TLS的配置对于生产者和消费者都是一样的。我们必须设置所需的安全协议以及信任库和密钥库信息,因为我们使用相互认证:

security.protocol=SSL

ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks

ssl.truststore.password=test1234

ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks

ssl.keystore.password=test1234

ssl.key.password=test1234

鉴于密码存储在客户端配置中,重要的是通过文件系统权限限制对文件的访问。

 

SASL / Kerberos

客户端(生产者,消费者,连接工作者等)将使用自己的principal(通常与运行客户端的用户名称相同)与集群进行身份验证,因此我们需要根据需要获取或创建这些principal。然后我们为每个principal创建一个JAAS文件。 KafkaClient部分描述了像生产者和消费者的客户如何连接到Kafka broker。以下是使用密钥表的客户端的示例配置(推荐用于长时间运行的进程):

KafkaClient {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    keyTab="/etc/security/keytabs/kafka_client.keytab"

    principal="kafka-client-1@EXAMPLE.COM";

};

对于像kafka-console-consumerkafka-console-producer这样的命令行实用程序,kinit可以与useTicketCache = true一起使用,如:

KafkaClient {

    com.sun.security.auth.module.Krb5LoginModule required

    useTicketCache=true;

};

安全协议和服务名称在producer.properties/ orconsumer.properties中设置。我们还必须包含信托库详细信息,因为我们使用SASL_SSL而不是SASL_PLAINTEXT

security.protocol = SASL_SSL

sasl.kerberos.service.name =kafka

ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks

ssl.truststore.password=test1234

# keystore configuration should not be needed, see note below

ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks

ssl.keystore.password=test1234

ssl.key.password=test1234

密钥库配置需要在0.9.0.0中,因为Kafka的错误已经修复,并将被包含在0.9.0.1中。

最后,我们将JAAS文件的名称作为JVM参数传递给客户端JVM

-Djava.security.auth.login.config =/etc/kafka/kafka_client_jaas.conf

kafka_client_jaas.conf中配置的keytab必须由启动kafka客户端的操作系统用户可读。

 

Putting It All Together

我们已经创建了一个基于Centos 7.2Vagrant设置,其中包括Kerberos服务器,KafkaOpenJDK 1.8.0,以便更容易地将所有部分一起测试。请安装VagrantVirtualBox(如果还没有),然后:

1.       克隆Git仓库:git clonehttps//github.com/confluentinc/securing-kafka-blog

2.       更改目录:cd secure-kafka-blog

3.       开始和提供Vagrant的环境:vagrant  up

4.       通过SSH连接到虚拟机:vagrant ssh

 

ZooKeeperBroker

在我们启动ZooKeeperKafka broker之前,让我们来看看他们的配置文件:

/etc/kafka/zookeeper.properties

dataDir=/var/lib/zookeeper

clientPort=2181

authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

requireClientAuthScheme=sasl

jaasLoginRenew=3600000

/etc/kafka/zookeeper_jaas.conf

Server {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    keyTab="/etc/security/keytabs/zookeeper.keytab"

    storeKey=true

    useTicketCache=false

    principal="zookeeper/kafka.example.com@EXAMPLE.COM";

};

/etc/kafka/server.properties

broker.id=0

listeners=SSL://:9093,SASL_SSL://:9095

security.inter.broker.protocol=SSL

zookeeper.connect=kafka.example.com:2181

log.dirs=/var/lib/kafka

zookeeper.set.acl=true

ssl.client.auth=required

ssl.keystore.location=/etc/security/tls/kafka.server.keystore.jks

ssl.keystore.password=test1234

ssl.key.password=test1234

ssl.truststore.location=/etc/security/tls/kafka.server.truststore.jks

ssl.truststore.password=test1234

sasl.kerberos.service.name=kafka

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

super.users=User:CN=kafka.example.com,OU=,O=Confluent,L=London,ST=London,C=GB

 

/etc/kafka/kafka_server_jaas.conf

KafkaServer {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    keyTab="/etc/security/keytabs/kafka.keytab"

    principal="kafka/kafka.example.com@EXAMPLE.COM";

};

 

Client {

    com.sun.security.auth.module.Krb5LoginModule required

    useKeyTab=true

    storeKey=true

    keyTab="/etc/security/keytabs/kafka.keytab"

    principal="kafka/kafka.example.com@EXAMPLE.COM";

};

 

而启动脚本(/ usr / sbin / start-zk-and-kafka)看起来像:

export KAFKA_HEAP_OPTS='-Xmx256M'

export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/zookeeper_jaas.conf'

/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties &

sleep 5

export KAFKA_OPTS='-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf'

/usr/bin/kafka-server-start /etc/kafka/server.properties &

 

执行执行脚本(sudo / usr / sbin / start-zk-and-kafka)后,以下内容应在server.log中:

INFO Registered broker 0 at path /brokers/ids/0 with addresses: SSL -> EndPoint(kafka.example.com,9093,SSL),SASL_SSL -> EndPoint(kafka.example.com,9095,SASL_SSL)

 

Create topic

因为我们将ZooKeeper配置为需要SASL身份验证,所以我们需要在启动kafka-topic时设置java.security.auth.login.config系统属性:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"

kafka-topics --create --topic securing-kafka --replication-factor 1 --partitions 3 --zookeeper kafka.example.com:2181

我们使用服务器principalkeytab作为这个例子,但是您可能需要为这样的工具创建一个单独的principalkeytab

 

Set ACLs

我们授权客户访问新创建的主题:

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf"

kafka-acls --authorizer-properties zookeeper.connect=kafka.example.com:2181 \

  --add --allow-principal User:kafkaclient \

  --producer --topic securing-kafka

kafka-acls --authorizer-properties zookeeper.connect=kafka.example.com:2181 \

  --add --allow-principal User:kafkaclient \

  --consumer --topic securing-kafka --group securing-kafka-group

 

Console Clients

客户端配置有所不同,具体取决于客户端是使用TLS还是SASL / Kerberos

TLS

控制台生产者是向broker发送少量数据的便捷方式:

kafka-console-producer --broker-list kafka.example.com:9093 --topic securing-kafka --producer.config

 

/etc/kafka/producer_ssl.properties

message1

message2

message3

 

producer_ssl.properties

bootstrap.servers=kafka.example.com:9093

security.protocol=SSL

ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks

ssl.truststore.password=test1234

ssl.keystore.location=/etc/security/tls/kafka.client.keystore.jks

ssl.keystore.password=test1234

ssl.key.password=test1234

 

控制台消费者是消费消息的便利方式:

kafka-console-consumer --bootstrap-server kafka.example.com:9093 --topic securing-kafka --new-consumer --from-beginning --consumer.config /etc/kafka/consumer_ssl.properties

 

consumer_ssl.properties

bootstrap.servers=kafka.example.com:9093

group.id=securing-kafka-group

security.protocol=SSL

ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks

ssl.truststore.password=test1234

ssl.keystore.location=/etc/security/tls/kafka.client.keystore.jks

ssl.keystore.password=test1234

ssl.key.password=test1234

 

输出的信息:

message1

message2

message3

 

SASL / Kerberos

通过SASL / Kerberos使用控制台客户端是类似的,但是我们还需要通过系统属性传递JAAS文件的位置。控制台消费者的一个例子如下:

 

export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf"

kafka-console-consumer --bootstrap-server kafka.example.com:9095 --topic securing-kafka --new-consumer --from-beginning --consumer.config /etc/kafka/consumer_sasl.properties

 

consumer_sasl.properties

bootstrap.servers=kafka.example.com:9095

group.id=securing-kafka-group

security.protocol=SASL_SSL

sasl.kerberos.service.name=kafka

ssl.truststore.location=/etc/security/tls/kafka.client.truststore.jks

ssl.truststore.password=test1234

ssl.keystore.location=/etc/security/tls/kafka.client.keystore.jks

ssl.keystore.password=test1234

ssl.key.password=test1234

 

Java客户端

配置新的Java客户端(生产者和消费者)只是将上述“控制台客户机”示例中使用的配置传递到相关的客户端构造函数。这可以通过从文件加载属性或以编程方式设置属性来完成。以配置为使用TLS的消费者以编程方式设置属性的示例如下:

Properties props = new Properties();

props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9093");

props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "securing-kafka-group");

props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");

props.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "/etc/security/tls/kafka.client.truststore.jks");

props.setProperty(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");

props.setProperty(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "/etc/security/tls/kafka.client.keystore.jks");

props.setProperty(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "test1234");

props.setProperty(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");

new KafkaConsumer(props);

 

调试问题

常见的情况是,在配置安全系统时,首次尝试时不会起作用。调试输出可能非常有帮助,以诊断问题的原因:

1.       可以通过javax.NET.debug系统属性启用SSL调试输出,例如export KAFKA_OPTS = -Djavax.net.debug = all

2.       SASL调试输出可以通过sun.security.krb5.debugsystem属性启用,例如export KAFKA_OPTS = -Dsun.security.krb5.debug = true

3.       可以通过在Kafka分发中包含的log4j.properties文件(在汇合平台的/etc/kafka/log4j.properties中)中将WARN更改为DEBUG来启用Kafka身份验证记录:log4j.logger.kafka.authorizer.logger = WARNauthorizerAppender

 

结论

我们已经展示了如何使用Apache Kafka 0.9Confluent Platform 2.0的一部分)中引入的安全功能来保护Kafka集群。我们专注于确保所有网络通信都需要认证,网络加密应用于所有代理/客户端和代理间网络流量。

这些安全功能是非常好的第一步,但是我们将使它们在后续版本中更好,更快,更简单。一如以往,我们很乐意听到您在Kafka邮件列表或Confluent Platform Google Group中的反馈。