Skip to content

Commit

Permalink
kafka: kerberos and authz support (#79)
Browse files Browse the repository at this point in the history
* Added Kerberos Authz support and configuration, modified health check to support kerberos and authz

* Added description and display name for kerberos and authz params

* Use mesosphere kafka image

* Use kafka image 0.3.0, Minor Fixes

* [DCOS-58615] KUDO Kafka document kerberos feature (#1)

* Add security.md

* Updated settings.md

* Added documentation for Kerberos and ACL functionality of KUDO Kafka

* Added kerberos with health check settings in configuration.md
  • Loading branch information
shubhanilBag authored and zmalik committed Oct 5, 2019
1 parent 9b461c1 commit 01ed089
Show file tree
Hide file tree
Showing 11 changed files with 409 additions and 29 deletions.
10 changes: 10 additions & 0 deletions repository/kafka/docs/latest/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ This check is a producer-consumer check based on a custom heartbeat topic which
```
kubectl kudo install kafka --instance=my-kafka-name -p LIVENESS_METHOD=FUNCTIONAL -p LIVENESS_TOPIC_PREFIX=MyHealthCheckTopic
```
###### Using Kerberos with health checks

Health checks can be enabled when using [Kerberos with KUDO Kafka](security.md).
When using `FUNCTIONAL` method then additional principals needs to be created. Assuming `livenessProbe` as the principal name the principals will be:
```
livenessProbe/kafka-kafka-0.kafka-svc.kudo-kafka.svc.cluster.local@LOCAL
livenessProbe/kafka-kafka-1.kafka-svc.kudo-kafka.svc.cluster.local@LOCAL
livenessProbe/kafka-kafka-2.kafka-svc.kudo-kafka.svc.cluster.local@LOCAL
```
You need to create one principal per broker for each individual livenessProbe.

##### Storage

Expand Down
86 changes: 86 additions & 0 deletions repository/kafka/docs/latest/security.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Security

## Authentication

KUDO Kafka currently supports Kerberos authentication.

### Kerberos Authentication

Kerberos authentication relies on a central authority to verify that Kafka clients (be it broker, consumer, or producer) are who they say they are. KUDO Kafka integrates with your existing Kerberos infrastructure to verify the identity of clients.

#### Prerequisites

* The hostname and port of a KDC reachable from the inside of k8s cluster
* Sufficient access to the KDC to create Kerberos principals
* Sufficient access to the KDC to retrieve a keytab for the generated principals
* `kubectl` installed

#### Configure Kerberos Authentication

##### Create principals

The KUDO Kafka service requires a Kerberos principal for each broker to be deployed. Each principal must be of the form
```
<service primary>/kafka-kafka-<broker index>.kafka-svc.<namespace>.svc.cluster.local@<service realm>
```
with:
* ```service primary = KERBEROS_PRIMARY```
* ```broker index = 0 up to BROKER_COUNT - 1```
* ```namespace = kubernetes namespace```
* ```service realm = KERBEROS_REALM```

For example, if installing with these options:
```
kubectl kudo install kafka \
--instance=kafka --namespace=kudo-kafka \
-p ZOOKEEPER_URI=zk-zookeeper-0.zk-hs:2181,zk-zookeeper-1.zk-hs:2181,zk-zookeeper-2.zk-hs:2181 \
-p KERBEROS_ENABLED=true \
-p KERBEROS_DEBUG=false\
-p KERBEROS_PRIMARY=kafka\
-p KERBEROS_REALM=LOCAL\
-p KERBEROS_KDC_HOSTNAME=kdc-service.kudo-kafka.svc.cluster.local \
-p KERBEROS_KDC_PORT=2500 \
-p KERBEROS_KEYTAB_SECRET="base64-kafka-keytab-secret"
```
then the principals to create would be:
```
kafka/kafka-kafka-0.kafka-svc.kudo-kafka.svc.cluster.local@LOCAL
kafka/kafka-kafka-1.kafka-svc.kudo-kafka.svc.cluster.local@LOCAL
kafka/kafka-kafka-2.kafka-svc.kudo-kafka.svc.cluster.local@LOCAL
```
#### Place Service Keytab in Kubernetes Secret Store

The KUDO Kafka service uses a keytab containing all node principals (service keytab). After creating the principals above, generate the service keytab making sure to include all the node principals. This should be stored as a secret in the Kubernetes Secret Store using `base64` encoding.

## Authorization

The KUDO Kafka service supports Kafka’s ACL-based authorization system. To use Kafka’s ACLs, Kerberos authentication must be enabled as detailed above.

### Enable Authorization

#### Prerequisites

* Completion of Kerberos authentication above.

### Install the Service

Install the KUDO Kafka service with the following options in addition to your own (remember, Kerberos must be enabled):

```
kubectl kudo install kafka \
--instance=kafka --namespace=kudo-kafka \
-p ZOOKEEPER_URI=zk-zookeeper-0.zk-hs:2181,zk-zookeeper-1.zk-hs:2181,zk-zookeeper-2.zk-hs:2181 \
-p BROKER_COUNT=3 \
-p KERBEROS_ENABLED=true \
-p KERBEROS_DEBUG=false \
-p KERBEROS_PRIMARY=kafka\
-p KERBEROS_REALM=LOCAL\
-p KERBEROS_KEYTAB_SECRET="base64-kafka-keytab-secret"
-p AUTHORIZATION_ENABLED=<true|false default false> \
-p AUTHORIZATION_ALLOW_EVERYONE_IF_NO_ACL_FOUND=<true|false default false> \
-p AUTHORIZATION_SUPER_USERS="User:User1"
```

The format of the list is `User:user1;User:user2;....` Using Kerberos authentication, the “user” value is the Kerberos primary. The Kafka brokers themselves are automatically designated as super users.

NOTE: It is possible to enable Authorization after initial installation but the service may become unavailable during the transition. Additionally, Kafka clients may fail to function if they do not have the correct ACLs assigned to their principals. During the transition `AUTHORIZATION_ALLOW_EVERYONE_IF_NO_ACL_FOUND` can be set to `true` to prevent clients from failing until their ACLs can be set correctly. After the transition, `AUTHORIZATION_ALLOW_EVERYONE_IF_NO_ACL_FOUND` should be reset back to `false`.
10 changes: 8 additions & 2 deletions repository/kafka/operator/operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,23 @@ tasks:
resources:
- service.yaml
- pdb.yaml
- configmap.yaml
- server.properties.yaml
- bootstrap.yaml
- metrics-config.yaml
- health-check.yaml
- jaas-config.yaml
- krb5-config.yaml
- statefulset.yaml
update:
resources:
- service.yaml
- pdb.yaml
- configmap.yaml
- server.properties.yaml
- bootstrap.yaml
- metrics-config.yaml
- health-check.yaml
- jaas-config.yaml
- krb5-config.yaml
- statefulset.yaml
not-allowed:
resources:
Expand Down
60 changes: 60 additions & 0 deletions repository/kafka/operator/params.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
AUTHORIZATION_ENABLED:
description: "Enable authorization."
default: "false"
displayName: "Authorization Enabled"

AUTHORIZATION_SUPER_USERS:
description: "Semi-colon delimited list of principals. For Kerberos principals, these will be of the form User:<Kerberos-Primary>. For TLS, they will be of the form User:<CN of TLS cert> (for the TLS cert CN=test-user,OU=,O=Confluent,L=London,ST=London,C=GB, the user would be test-user)"
default: ""
displayName: "Super Users"

AUTHORIZATION_ALLOW_EVERYONE_IF_NO_ACL_FOUND:
description: "Allow any user to perform an action if no ACL is found for the resource."
default: "false"
displayName: "Allow everyone if no acl found"

BROKER_COUNT:
description: "Number of brokers spun up for Kafka"
default: "3"
Expand Down Expand Up @@ -94,6 +109,46 @@ DELETE_RECORDS_PURGATORY_PURGE_INTERVAL_REQUESTS:
default: "1"
description: "The purge interval (in number of requests) of the delete records request purgatory"

KERBEROS_ENABLED:
description: "Enable kerberos authentication."
default: "false"
displayName: "Kerberos Enabled"

KERBEROS_ENABLED_FOR_ZOOKEEPER:
description: "Enable Kerberos authentication for communication with Apache Zookeeper."
default: "false"
displayName: "Zookeeper Kerberos Enabled"

KERBEROS_PRIMARY:
description: "The Kerberos primary used by Kafka tasks."
default: "kafka"
displayName: "Kerberos Primary"

KERBEROS_KEYTAB_SECRET:
description: "The name of the Kubernetes secret storing the keytab."
default: "base64-kafka-keytab-secret"
displayName: "Kerberos Keytab Secret"

KERBEROS_REALM:
description: "The Kerberos realm used to render the principal of Kafka broker pods."
default: "LOCAL"
displayName: "Kerberos Realm"

KERBEROS_KDC_HOSTNAME:
description: "The name or address of a host running a KDC for the realm."
default: "kdc-service"
displayName: "Kerberos Hostname"

KERBEROS_KDC_PORT:
description: "The port of the host running a KDC for that realm."
default: "2500"
displayName: "Kerberos Port"

KERBEROS_DEBUG:
description: "Turn debug Kerberos logging on or off to assist in diagnosing issues with Kerberos authentication."
default: "false"
displayName: "Kerberos Debug"

LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS:
default: "300"
description: "The frequency with which the partition rebalance check is triggered by the controller"
Expand Down Expand Up @@ -132,6 +187,11 @@ LIVENESS_TOPIC_PREFIX:
description: "This topic is used by livenessProbe when 'FUNCTIONAL' method is selected."
default: "KafkaLivenessTopic"

LIVENESS_KERBEROS_PRIMARY:
default: "livenessProbe"
description: "The Kerberos primary used by the liveness probe when using FUNCTIONAL livenessProbe method."
displayName: "Liveness Probe Kerberos Primary"

LOG_FLUSH_INTERVAL_MESSAGES:
default: "9223372036854775807"
description: "The number of messages accumulated on a log partition before messages are flushed to disk"
Expand Down
99 changes: 99 additions & 0 deletions repository/kafka/operator/templates/bootstrap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: bootstrap
data:
bootstrap.sh: |
#!/usr/bin/env bash
cp /health-check-script/health-check.sh health-check.sh;
chmod +x health-check.sh;
{{ if eq .Params.KERBEROS_ENABLED "true" }}
cat /kafka-keytab/kafka.keytab | base64 --decode > kafka.keytab;
cp /jass-config/kafka_server_jaas.conf $KAFKA_HOME/config/kafka_server_jaas.conf;
cp /krb5-config/krb5.conf $KAFKA_HOME/config/krb5.conf;
sed -i "s/<HOSTNAME>/$(hostname -f)/g" $KAFKA_HOME/config/kafka_server_jaas.conf;
export KAFKA_OPTS="-Djava.security.auth.login.config=${KAFKA_HOME}/config/kafka_server_jaas.conf -Djava.security.krb5.conf=${KAFKA_HOME}/config/krb5.conf $KAFKA_OPTS"
{{ if eq .Params.KERBEROS_DEBUG "true" }}
export KAFKA_OPTS="-Dsun.security.krb5.debug=true $KAFKA_OPTS"
{{ end }}
{{ end }}
KAFKA_BROKER_ID=${HOSTNAME##*-}
# LISTENERS CONFIGURATION
LISTENERS="INTERNAL://0.0.0.0:${KAFKA_BROKER_PORT}"
# ADVERTISED LISTENERS
ADVERTISED_LISTENERS="INTERNAL://$(hostname -f):${KAFKA_BROKER_PORT}"
{{ if eq .Params.KERBEROS_ENABLED "true" }}
LISTENER_SECURITY_PROTOCOL_MAP="INTERNAL:SASL_PLAINTEXT"
# INTER_BROKER_SECURITY_PROTOCOL="SASL_PLAINTEXT"
{{ else }}
LISTENER_SECURITY_PROTOCOL_MAP="INTERNAL:PLAINTEXT"
# INTER_BROKER_SECURITY_PROTOCOL="PLAINTEXT"
{{ end }}
SASL_ENABLED_MECHANISMS=""
if [[ "$KAFKA_CLIENT_ENABLED" = "true" ]]; then
LISTENERS="${LISTENERS},CLIENT://0.0.0.0:${KAFKA_CLIENT_PORT}"
ADVERTISED_LISTENERS="${ADVERTISED_LISTENERS},CLIENT://$(hostname -f):${KAFKA_CLIENT_PORT}"
if [[ "$KAFKA_CLIENT_AUTHENTICATION" = "scram-sha-512" ]]; then
SASL_ENABLED_MECHANISMS="SCRAM-SHA-512\n$SASL_ENABLED_MECHANISMS"
LISTENER_SECURITY_PROTOCOL_MAP="${LISTENER_SECURITY_PROTOCOL_MAP},CLIENT:SASL_PLAINTEXT"
CLIENT_LISTENER=$(cat <<EOF
# CLIENT listener authentication
listener.name.client.scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required;
EOF
)
else
LISTENER_SECURITY_PROTOCOL_MAP="${LISTENER_SECURITY_PROTOCOL_MAP},CLIENT:PLAINTEXT"
fi
fi
export KAFKA_LOG_DIR_PATH="${LOG_DIR}/log${KAFKA_BROKER_ID}"
if [[ -e ${KAFKA_HOME}/init/rack.id ]]; then
export RACK_ID=$(cat ${KAFKA_HOME}/init/rack.id)
fi
{{ if eq .Params.AUTHORIZATION_ENABLED "true" }}
# Calculate Kafka Authorization Super Users
SUPER_USERS=()
{{ if .Params.AUTHORIZATION_SUPER_USERS }}
SUPER_USERS="{{ .Params.AUTHORIZATION_SUPER_USERS }}"
SUPER_USERS=(${SUPER_USERS//;/ })
{{ end }}
{{ if eq .Params.KERBEROS_ENABLED "true" }}
SUPER_USERS=("${SUPER_USERS[@]}" "User:{{ .Params.KERBEROS_PRIMARY }}")
{{ end }}
SUPER_USERS=$(printf ";%s" "${SUPER_USERS[@]}")
SUPER_USERS=${SUPER_USERS:1}
{{ end }}
# Set Environment
echo "KAFKA_OPTS=\"$KAFKA_OPTS\"" > ${KAFKA_HOME}/.env
KAFKA_CONFIGURATION=$(cat /config/server.properties)
# Write the config file
cat > ${KAFKA_HOME}/server.properties <<EOF
broker.id=${KAFKA_BROKER_ID}
broker.rack=${RACK_ID}
# Listeners
listeners=${LISTENERS}
advertised.listeners=${ADVERTISED_LISTENERS}
listener.security.protocol.map=${LISTENER_SECURITY_PROTOCOL_MAP}
inter.broker.listener.name=INTERNAL
#security.inter.broker.protocol=${INTER_BROKER_SECURITY_PROTOCOL}
{{ if eq .Params.AUTHORIZATION_ENABLED "true" }}
super.users=${SUPER_USERS}
{{ end }}
# Logs
log.dirs=${KAFKA_LOG_DIR_PATH}
# Provided configuration
${KAFKA_CONFIGURATION}
EOF
Loading

0 comments on commit 01ed089

Please sign in to comment.