Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds ACL Integration to Ziggurat #284

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
46 changes: 29 additions & 17 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "4.11.1"
(defproject tech.gojek/ziggurat "5.0.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand All @@ -15,24 +15,33 @@
[com.cemerick/url "0.1.1"]
[com.datadoghq/java-dogstatsd-client "2.4"]
[com.fasterxml.jackson.core/jackson-databind "2.9.9"]
[com.novemberain/langohr "5.2.0" :exclusions [org.clojure/clojure]]
[com.novemberain/langohr "5.2.0" :exclusions [org.clojure/clojure org.slf4j/slf4j-api]]
[com.taoensso/nippy "3.1.1"]
[io.dropwizard.metrics5/metrics-core "5.0.0" :scope "compile"]
[medley "1.3.0" :exclusions [org.clojure/clojure]]
[mount "0.1.16"]
[io.jaegertracing/jaeger-core "1.6.0"]
[io.jaegertracing/jaeger-client "1.6.0"]
[io.jaegertracing/jaeger-core "1.6.0" :exclusions [org.slf4j/slf4j-api]]
[io.jaegertracing/jaeger-client "1.6.0" :exclusions [org.jetbrains.kotlin/kotlin-stdlib-common
org.slf4j/slf4j-api]]
[org.apache.httpcomponents/fluent-hc "4.5.13"]
[org.apache.kafka/kafka-clients "2.8.2" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-streams "2.8.2" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "3.7.0" :exclusions [log4j
org.lz4/lz4-java
org.slf4j/slf4j-api
org.slf4j/slf4j-log4j12]]
[org.apache.kafka/kafka-streams "3.7.0" :exclusions [log4j
org.lz4/lz4-java
org.slf4j/slf4j-api
org.slf4j/slf4j-log4j12
com.fasterxml.jackson.core/jackson-databind
com.fasterxml.jackson.core/jackson-annotations]]
[org.clojure/clojure "1.10.3"]
[org.clojure/tools.logging "1.1.0"]
[nrepl/nrepl "0.8.3"]
[clojusc/protobuf "3.5.1-v1.1"]
[prismatic/schema "1.1.12"]
[clj-statsd "0.4.0"]
[ring/ring "1.9.3"]
[ring/ring-core "1.9.3"]
[ring/ring-core "1.9.3" :exclusions [commons-codec]]
[ring/ring-defaults "0.3.2"]
[ring/ring-jetty-adapter "1.9.3"]
[ring/ring-json "0.5.1"]
Expand All @@ -41,18 +50,15 @@
[com.newrelic.agent.java/newrelic-api "6.5.0"]
[yleisradio/new-reliquary "1.1.0" :exclusions [org.clojure/clojure]]
[metosin/ring-swagger "0.26.2"
:exclusions [cheshire
com.fasterxml.jackson.core/jackson-core
com.fasterxml.jackson.dataformat/jackson-dataformat-smile
com.fasterxml.jackson.dataformat/jackson-dataformat-cbor]]
:exclusions [org.mozilla/rhino com.fasterxml.jackson.dataformat/jackson-dataformat-smile com.fasterxml.jackson.dataformat/jackson-dataformat-cbor cheshire com.google.code.findbugs/jsr305 com.fasterxml.jackson.core/jackson-core]]
[metosin/ring-swagger-ui "3.46.0"]
[cambium/cambium.core "1.1.0"]
[cambium/cambium.core "1.1.0" :exclusions [org.slf4j/slf4j-api]]
[cambium/cambium.codec-cheshire "1.0.0"]
[cambium/cambium.logback.json "0.4.4"]
[ch.qos.logback/logback-classic "1.2.9"]
[cambium/cambium.logback.json "0.4.4" :exclusions [com.fasterxml.jackson.core/jackson-annotations com.fasterxml.jackson.core/jackson-databind]]
[ch.qos.logback/logback-classic "1.2.9" :exclusions [org.slf4j/slf4j-api]]
[ch.qos.logback.contrib/logback-json-classic "0.1.5"]
[ch.qos.logback.contrib/logback-jackson "0.1.5"]
[net.logstash.logback/logstash-logback-encoder "6.6"]
[net.logstash.logback/logstash-logback-encoder "6.6" :exclusions [com.fasterxml.jackson.core/jackson-databind com.fasterxml.jackson.core/jackson-core]]
[clj-commons/iapetos "0.1.9"]
[org.apache.commons/commons-pool2 "2.11.1"]]
:deploy-repositories [["clojars" {:url "https://clojars.org/repo"
Expand All @@ -72,8 +78,14 @@
:dependencies [[com.google.protobuf/protobuf-java "3.17.0"]
[junit/junit "4.13.2"]
[org.hamcrest/hamcrest-core "2.2"]
[org.apache.kafka/kafka-streams "2.8.2" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.8.2" :classifier "test"]
[org.apache.kafka/kafka-streams "3.7.0" :classifier "test" :exclusions [log4j
org.lz4/lz4-java
org.slf4j/slf4j-api
org.slf4j/slf4j-log4j12
com.fasterxml.jackson.core/jackson-databind
com.fasterxml.jackson.core/jackson-annotations]]
[org.apache.kafka/kafka-clients "3.7.0" :classifier "test" :exclusions [org.slf4j/slf4j-api]]
[org.apache.kafka/kafka-streams-test-utils "3.7.0" :classifier "test" :exclusions [org.lz4/lz4-java log4j org.slf4j/slf4j-log4j12 org.slf4j/slf4j-api]]
[org.clojure/test.check "1.1.0"]]
:plugins [[lein-cloverage "1.2.2" :exclusions [org.clojure/clojure]]]
:cloverage {:exclude-call ['cambium.core/info
Expand Down
79 changes: 65 additions & 14 deletions src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
[mount.core :refer [defstate]]
[ziggurat.util.java-util :as util])
(:import (java.util Properties)
[org.apache.kafka.common.config SaslConfigs])
[org.apache.kafka.common.config SaslConfigs SslConfigs]
[org.apache.kafka.clients CommonClientConfigs])
(:gen-class
:methods
[^{:static true} [get [String] Object]
Expand Down Expand Up @@ -95,6 +96,9 @@
(defn ssl-config []
(get-in config [:ziggurat :ssl]))

(defn sasl-config []
(get-in config [:ziggurat :sasl]))

(defn rabbitmq-config []
(get (ziggurat-config) :rabbit-mq))

Expand Down Expand Up @@ -168,6 +172,10 @@
:enabled
:jaas])

(defn- not-blank?
[s]
(and (not (nil? s)) (not (str/blank? (str/trim s)))))

(defn- to-list
[s]
(if (empty? s)
Expand Down Expand Up @@ -197,26 +205,32 @@
(.setProperty p sk nv))))
p)

(def jaas-template
{"PLAIN" "org.apache.kafka.common.security.plain.PlainLoginModule"
"SCRAM-SHA-512" "org.apache.kafka.common.security.scram.ScramLoginModule"})

(defn create-jaas-properties
[user-name password mechanism]
(let [jaas-template (get jaas-template mechanism)]
(format "%s required username=\"%s\" password=\"%s\";" jaas-template user-name password)))
(defn create-jaas-properties [username password login-module]
(let [username-str (when (not-blank? username) (format " username=\"%s\"" username))
password-str (when (not-blank? password) (format " password=\"%s\"" password))
credentials (str username-str password-str)]
(format "%s required%s;" login-module (if (empty? credentials) "" credentials))))

(defn- add-jaas-properties
[properties jaas-config]
(if (some? jaas-config)
(let [username (get jaas-config :username)
password (get jaas-config :password)
mechanism (get jaas-config :mechanism)]
login-module (get jaas-config :login-module)
jaas_props (create-jaas-properties username password login-module)]
(doto properties
(.put SaslConfigs/SASL_JAAS_CONFIG
(create-jaas-properties username password mechanism))))
(.put SaslConfigs/SASL_JAAS_CONFIG jaas_props)))
properties))

(defn- add-sasl-properties
([properties mechanism protocol]
(add-sasl-properties properties mechanism protocol nil))
([properties mechanism protocol login-callback-handler]
(when (some? mechanism) (.put properties SaslConfigs/SASL_MECHANISM mechanism))
(when (some? protocol) (.put properties CommonClientConfigs/SECURITY_PROTOCOL_CONFIG protocol))
(when (some? login-callback-handler) (.put properties SaslConfigs/SASL_LOGIN_CALLBACK_HANDLER_CLASS login-callback-handler))
properties))

(defn build-ssl-properties
[properties set-property-fn ssl-config-map]
"Builds SSL properties from ssl-config-map which is a map where keys are
Expand All @@ -232,18 +246,54 @@
{:enabled true
:ssl-keystore-location <>
:ssl-keystore-password <>
:mechanism <>
:protocol <>
{:jaas {:username <>
:password <>
:mechanism <>}}}
:login-module <>}}}
"
(let [ssl-configs-enabled (:enabled ssl-config-map)
jaas-config (get ssl-config-map :jaas)]
jaas-config (get ssl-config-map :jaas)
mechanism (get ssl-config-map :mechanism)
protocol (get ssl-config-map :protocol)]
(if (true? ssl-configs-enabled)
(as-> properties pr
(add-jaas-properties pr jaas-config)
(add-sasl-properties pr mechanism protocol)
(reduce-kv set-property-fn pr ssl-config-map))
properties)))

(defn build-sasl-properties
[properties set-property-fn sasl-config-map]
"Builds SASL properties from sasl-config-map which is a map where keys are
Clojure keywords in kebab case. These keys are converted to Kafka properties by set-property-fn.

SASL properties are only set if [:ziggurat :sasl :enabled] returns true.

Creates JAAS template if values are provided in the map provided against this key sequence
[:ziggurat :sasl :jaas].

Example of sasl-config-map
{:enabled true
:protocol <>
:mechanism <>
{:jaas
{:username <>
:password <>
:login-module <>}}}
"
(let [sasl-configs-enabled (:enabled sasl-config-map)
jaas-config (get sasl-config-map :jaas)
mechanism (get sasl-config-map :mechanism)
protocol (get sasl-config-map :protocol)
login-callback-handler (get sasl-config-map :login-callback-handler)]
(if (true? sasl-configs-enabled)
(as-> properties pr
(add-jaas-properties pr jaas-config)
(add-sasl-properties pr mechanism protocol login-callback-handler)
(reduce-kv set-property-fn pr sasl-config-map))
properties)))

(defn build-properties
"Builds Properties object from the provided config-map which is a map where keys are
Clojure keywords in kebab case. These keys are converted to Kafka properties by set-property-fn.
Expand All @@ -265,6 +315,7 @@
[set-property-fn config-map]
(as-> (Properties.) pr
(build-ssl-properties pr set-property-fn (ssl-config))
(build-sasl-properties pr set-property-fn (sasl-config))
(reduce-kv set-property-fn pr config-map)))

(def build-consumer-config-properties
Expand Down
Loading
Loading