From 41d69a6182891705cc752182790a47bbb58bcd48 Mon Sep 17 00:00:00 2001 From: Akash Chhabra Date: Tue, 15 Oct 2024 15:34:18 +0700 Subject: [PATCH] Adds SSL Properties and refactors --- project.clj | 4 +- src/ziggurat/config.clj | 59 +++++++++----- test/ziggurat/config_test.clj | 142 ++++++++++++++++++---------------- 3 files changed, 116 insertions(+), 89 deletions(-) diff --git a/project.clj b/project.clj index 64a42a61..92032332 100644 --- a/project.clj +++ b/project.clj @@ -54,7 +54,9 @@ [ch.qos.logback.contrib/logback-jackson "0.1.5"] [net.logstash.logback/logstash-logback-encoder "6.6"] [clj-commons/iapetos "0.1.9"] - [org.apache.commons/commons-pool2 "2.11.1"]] + [org.apache.commons/commons-pool2 "2.11.1"] + [com.github.jnr/jffi "1.3.12"] + [com.github.jnr/jnr-unixsocket "0.38.21"]] :deploy-repositories [["clojars" {:url "https://clojars.org/repo" :username :env/clojars_username :password :env/clojars_password diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj index 663ed020..8cc427ab 100644 --- a/src/ziggurat/config.clj +++ b/src/ziggurat/config.clj @@ -6,7 +6,7 @@ [mount.core :refer [defstate]] [ziggurat.util.java-util :as util]) (:import (java.util Properties) - [org.apache.kafka.common.config SaslConfigs] + [org.apache.kafka.common.config SaslConfigs SslConfigs] [org.apache.kafka.clients CommonClientConfigs]) (:gen-class :methods @@ -14,6 +14,8 @@ ^{:static true} [getIn [java.lang.Iterable] Object]] :name tech.gojek.ziggurat.internal.Config)) +(def DEFAULT-LOGIN-CALLBACK-HANDLER "io.gtflabs.kafka.security.oauthbearer.kubernetes.PodLoginCallbackHandler") + (def config-file "config.edn") (def default-config @@ -201,9 +203,18 @@ (.setProperty p sk nv)))) p) -(defn create-jaas-properties - [user-name password login-module] - (format "%s required username=\"%s\" password=\"%s\";" login-module user-name password)) +(defn create-jaas-properties [user-name password login-module] + (let [username-str (if user-name (format " username=\"%s\"" user-name) "") + password-str (if password (format " password=\"%s\"" password) "") + credentials (str username-str password-str)] + (format "%s required%s;" login-module (if (empty? credentials) "" credentials)))) + +(defn- add-ssl-properties + [properties ssl-config-map] + (doto properties + (.put SslConfigs/SSL_TRUSTSTORE_LOCATION_CONFIG (:ssl-truststore-location ssl-config-map "/etc/kafka/certs/truststore.p12")) + (.put SslConfigs/SSL_TRUSTSTORE_PASSWORD_CONFIG (:ssl-truststore-password ssl-config-map))) + properties) (defn- add-jaas-properties [properties jaas-config] @@ -217,12 +228,11 @@ properties)) (defn- add-sasl-properties - [properties mechanism protocol] - (if (and (some? mechanism) (some? protocol)) - (doto properties - (.put SaslConfigs/SASL_MECHANISM mechanism) - (.put CommonClientConfigs/SECURITY_PROTOCOL_CONFIG protocol)) - properties)) + [properties mechanism protocol login-callback-handler] + (when (some? mechanism) (.put properties SaslConfigs/SASL_MECHANISM mechanism)) + (when (some? protocol) (.put properties CommonClientConfigs/SECURITY_PROTOCOL_CONFIG protocol)) + (when (some? login-callback-handler) (.put properties SaslConfigs/SASL_LOGIN_CALLBACK_HANDLER_CLASS login-callback-handler)) + properties) (defn build-ssl-properties [properties set-property-fn ssl-config-map] @@ -251,9 +261,10 @@ protocol (get ssl-config-map :protocol)] (if (true? ssl-configs-enabled) (as-> properties pr - (add-jaas-properties pr jaas-config) - (add-sasl-properties pr mechanism protocol) - (reduce-kv set-property-fn pr ssl-config-map)) + (add-ssl-properties pr ssl-config-map) + (add-jaas-properties pr jaas-config) + (add-sasl-properties pr mechanism protocol nil) + (reduce-kv set-property-fn pr ssl-config-map)) properties))) (defn build-sasl-properties @@ -275,15 +286,16 @@ :password <> :login-module <>}}} " - (let [sasl-configs-enabled (:enabled sasl-config-map) - jaas-config (get sasl-config-map :jaas) - mechanism (get sasl-config-map :mechanism) - protocol (get sasl-config-map :protocol)] + (let [sasl-configs-enabled (:enabled sasl-config-map) + jaas-config (get sasl-config-map :jaas) + mechanism (get sasl-config-map :mechanism "OAUTHBEARER") + protocol (get sasl-config-map :protocol "SASL_SSL") + login-callback-handler (get sasl-config-map :login-callback-handler DEFAULT-LOGIN-CALLBACK-HANDLER)] (if (true? sasl-configs-enabled) (as-> properties pr - (add-jaas-properties pr jaas-config) - (add-sasl-properties pr mechanism protocol) - (reduce-kv set-property-fn pr sasl-config-map)) + (add-jaas-properties pr jaas-config) + (add-sasl-properties pr mechanism protocol login-callback-handler) + (reduce-kv set-property-fn pr sasl-config-map)) properties))) (defn build-properties @@ -324,3 +336,10 @@ (defn get-channel-retry-count [topic-entity channel] (:count (channel-retry-config topic-entity channel))) + +;; 1. Bump up kafka version to 3.7.0 +;; 2. Introduce changes for sasl and ssl properties: +;; 2.1 + + +;;ZIGGURAT_ \ No newline at end of file diff --git a/test/ziggurat/config_test.clj b/test/ziggurat/config_test.clj index 581056df..21c38d3d 100644 --- a/test/ziggurat/config_test.clj +++ b/test/ziggurat/config_test.clj @@ -155,10 +155,10 @@ (mount/stop))))) (deftest test-build-properties - (let [config-mapping-table (merge consumer-config-mapping-table - producer-config-mapping-table - streams-config-mapping-table) - set-all-property (partial set-property config-mapping-table) + (let [config-mapping-table (merge consumer-config-mapping-table + producer-config-mapping-table + streams-config-mapping-table) + set-all-property (partial set-property config-mapping-table) build-all-config-properties (partial build-properties set-all-property)] (testing "all valid kafka configs" (let [config-map {:auto-offset-reset :latest @@ -187,20 +187,20 @@ (is (= auto-commit-interval-ms "NOT FOUND")) (is (= commit-interval-ms "5000")))) (testing "mapping table for backward compatibility" - (let [config-map {:auto-offset-reset-config "latest" - :changelog-topic-replication-factor 2 - :commit-interval-ms 20000 - :consumer-group-id "foo" - :default-api-timeout-ms-config 3000 - :default-key-serde "key-serde" - :default-value-serde "value-serde" - :key-deserializer-class-config "key-deserializer" - :key-serializer-class "key-serializer" - :retries-config 5 - :session-timeout-ms-config 4000 - :stream-threads-count 4 - :value-deserializer-class-config "value-deserializer" - :value-serializer-class "value-serializer"} + (let [config-map {:auto-offset-reset-config "latest" + :changelog-topic-replication-factor 2 + :commit-interval-ms 20000 + :consumer-group-id "foo" + :default-api-timeout-ms-config 3000 + :default-key-serde "key-serde" + :default-value-serde "value-serde" + :key-deserializer-class-config "key-deserializer" + :key-serializer-class "key-serializer" + :retries-config 5 + :session-timeout-ms-config 4000 + :stream-threads-count 4 + :value-deserializer-class-config "value-deserializer" + :value-serializer-class "value-serializer"} props (build-all-config-properties config-map) auto-offset-reset (.getProperty props "auto.offset.reset") auto-commit-interval-ms (.getProperty props "auto.commit.interval.ms") @@ -238,18 +238,19 @@ :poll-timeout-ms-config 10000} props (build-all-config-properties config-map)] (doall - (map (fn [[k _]] - (let [string-key (str/replace (name k) #"-" ".") - not-found "NOT FOUND!" - v (.getProperty props string-key not-found)] - (is (= v not-found)))) - config-map)))) + (map (fn [[k _]] + (let [string-key (str/replace (name k) #"-" ".") + not-found "NOT FOUND!" + v (.getProperty props string-key not-found)] + (is (= v not-found)))) + config-map)))) (testing "should set ssl properties for streams if enabled is set to true" - (with-redefs [ssl-config (constantly {:enabled true - :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password"})] - (let [streams-config-map {:auto-offset-reset :latest - :group-id "foo"} + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password" + :ssl-truststore-password "some-password-2"})] + (let [streams-config-map {:auto-offset-reset :latest + :group-id "foo"} props (build-streams-config-properties streams-config-map) auto-offset-reset (.getProperty props "auto.offset.reset") group-id (.getProperty props "group.id") @@ -260,9 +261,10 @@ (is (= ssl-ks-location "/some/location")) (is (= ssl-ks-password "some-password"))))) (testing "should set ssl properties for consumer API if enabled is set to true" - (with-redefs [ssl-config (constantly {:enabled true - :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password"})] + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password" + :ssl-truststore-password "some-password-2"})] (let [streams-config-map {:max-poll-records 500 :enable-auto-commit true} props (build-consumer-config-properties streams-config-map) @@ -270,31 +272,32 @@ enable-auto-comit (.getProperty props "enable.auto.commit") ssl-ks-location (.getProperty props "ssl.keystore.location") ssl-ks-password (.getProperty props "ssl.keystore.password")] - (is (= max-poll-records "500")) + (is (= max-poll-records "500")) (is (= enable-auto-comit "true")) (is (= ssl-ks-location "/some/location")) (is (= ssl-ks-password "some-password"))))) (testing "should set ssl properties for producer API if enabled is set to true" - (with-redefs [ssl-config (constantly {:enabled true - :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password"})] - (let [streams-config-map {:batch.size 500 - :acks 1} + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password" + :ssl-truststore-password "some-password-2"})] + (let [streams-config-map {:batch.size 500 + :acks 1} props (build-producer-config-properties streams-config-map) batch-size (.getProperty props "batch.size") acks (.getProperty props "acks") ssl-ks-location (.getProperty props "ssl.keystore.location") ssl-ks-password (.getProperty props "ssl.keystore.password")] - (is (= batch-size "500")) + (is (= batch-size "500")) (is (= acks "1")) (is (= ssl-ks-location "/some/location")) (is (= ssl-ks-password "some-password"))))) (testing "should not set ssl properties for streams if eenabled is set to false" - (with-redefs [ssl-config (constantly {:enabled false + (with-redefs [ssl-config (constantly {:enabled false :ssl-keystore-location "/some/location" :ssl-keystore-password "some-password"})] - (let [streams-config-map {:auto-offset-reset :latest - :group-id "foo"} + (let [streams-config-map {:auto-offset-reset :latest + :group-id "foo"} props (build-streams-config-properties streams-config-map) auto-offset-reset (.getProperty props "auto.offset.reset") group-id (.getProperty props "group.id") @@ -305,10 +308,11 @@ (is (nil? ssl-ks-location)) (is (nil? ssl-ks-password))))) (testing "ssl properties from streams config map overrides the ssl properties provided in [:ziggurat :ssl]" - (with-redefs [ssl-config (constantly {:enabled true - :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password"})] - (let [streams-config-map {:auto-offset-reset :latest + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password" + :ssl-truststore-password "some-password-2"})] + (let [streams-config-map {:auto-offset-reset :latest :ssl-keystore-location "/some/different/location" :ssl-keystore-password "different-password"} props (build-streams-config-properties streams-config-map) @@ -319,45 +323,47 @@ (is (= ssl-ks-location "/some/different/location")) (is (= ssl-ks-password "different-password"))))) (testing "ssl properties create jaas template from the map provided in [:ziggurat :ssl :jaas]" - (with-redefs [ssl-config (constantly {:enabled true - :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password" - :mechanism "SCRAM-SHA-512" - :jaas {:username "myuser" - :password "mypassword" - :login-module "org.apache.kafka.common.security.scram.ScramLoginModule"}})] - (let [streams-config-map {:auto-offset-reset :latest} + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password" + :ssl-truststore-password "some-password-2" + :mechanism "SCRAM-SHA-512" + :jaas {:username "myuser" + :password "mypassword" + :login-module "org.apache.kafka.common.security.scram.ScramLoginModule"}})] + (let [streams-config-map {:auto-offset-reset :latest} props (build-streams-config-properties streams-config-map) auto-offset-reset (.getProperty props "auto.offset.reset") ssl-ks-location (.getProperty props "ssl.keystore.location") ssl-ks-password (.getProperty props "ssl.keystore.password") sasl-jaas-config (.getProperty props "sasl.jaas.config")] (is (= auto-offset-reset "latest")) - (is (= ssl-ks-location "/some/location")) - (is (= ssl-ks-password "some-password")) + (is (= ssl-ks-location "/some/location")) + (is (= ssl-ks-password "some-password")) (is (= sasl-jaas-config (create-jaas-properties "myuser" "mypassword" "org.apache.kafka.common.security.scram.ScramLoginModule")))))) (testing "ssl properties DO NOT create jaas template if no value is provided for key sequence [:ziggurat :ssl :jaas]" - (with-redefs [ssl-config (constantly {:enabled true - :ssl-keystore-location "/some/location" - :ssl-keystore-password "some-password"})] - (let [streams-config-map {:auto-offset-reset :latest} + (with-redefs [ssl-config (constantly {:enabled true + :ssl-keystore-location "/some/location" + :ssl-keystore-password "some-password" + :ssl-truststore-password "some-password-2"})] + (let [streams-config-map {:auto-offset-reset :latest} props (build-streams-config-properties streams-config-map) auto-offset-reset (.getProperty props "auto.offset.reset") ssl-ks-location (.getProperty props "ssl.keystore.location") ssl-ks-password (.getProperty props "ssl.keystore.password") sasl-jaas-config (.getProperty props "sasl.jaas.config")] (is (= auto-offset-reset "latest")) - (is (= ssl-ks-location "/some/location")) - (is (= ssl-ks-password "some-password")) + (is (= ssl-ks-location "/some/location")) + (is (= ssl-ks-password "some-password")) (is (nil? sasl-jaas-config))))) (testing "sasl properties create jaas template from the map provided in [:ziggurat :sasl :jaas]" - (with-redefs [sasl-config (constantly {:enabled true - :protocol "SASL_PLAINTEXT" + (with-redefs [sasl-config (constantly {:enabled true + :protocol "SASL_PLAINTEXT" :mechanism "SCRAM-SHA-256" - :jaas {:username "myuser" - :password "mypassword" - :login-module "org.apache.kafka.common.security.scram.ScramLoginModule"}})] - (let [streams-config-map {:auto-offset-reset :latest} + :jaas {:username "myuser" + :password "mypassword" + :login-module "org.apache.kafka.common.security.scram.ScramLoginModule"}})] + (let [streams-config-map {:auto-offset-reset :latest} props (build-streams-config-properties streams-config-map) auto-offset-reset (.getProperty props "auto.offset.reset") sasl-jaas-config (.getProperty props "sasl.jaas.config")