Skip to content

Commit

Permalink
Merge pull request #219 from gojek/support-all-kafka-stream-consumer-…
Browse files Browse the repository at this point in the history
…configs

support all consumer, producer and stream configurations
  • Loading branch information
mjayprateek authored Apr 28, 2021
2 parents 717a7dc + 86ed386 commit 1ccb2f0
Show file tree
Hide file tree
Showing 10 changed files with 268 additions and 350 deletions.
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
# Change Log
# Changelog

All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/).

## 3.13.0
- Supports all of the official Kafka configurations for [Streams API](https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html), [Consumer API](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html) and [Producer API](https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html)

## 3.12.0
- Uses Kafka Streams client 2.7.0
- Introduces default.api.timeout.ms for Kafka Consumer API
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,8 @@ Ziggurat Config | Default Value | Description | Mandatory?

## Configuration

As of Ziggurat version 3.13.0, all the official Kafka configs Kafka configurations for [Streams API](https://docs.confluent.io/platform/current/installation/configuration/streams-configs.html), [Consumer API](https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html) and [Producer API](https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html) are supported.

All Ziggurat configs should be in your `clonfig` `config.edn` under the `:ziggurat` key.

```clojure
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "3.12.0"
(defproject tech.gojek/ziggurat "3.13.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down
75 changes: 74 additions & 1 deletion src/ziggurat/config.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
(ns ziggurat.config
(:require [clojure.edn :as edn]
[clojure.java.io :as io]
[clojure.string :as str]
[clonfig.core :as clonfig]
[mount.core :refer [defstate]]
[ziggurat.util.java-util :as util])
(:import (java.util Properties))
(:gen-class
:methods [^{:static true} [get [String] Object]
^{:static true} [getIn [java.lang.Iterable] Object]]
Expand Down Expand Up @@ -81,7 +83,7 @@

(defn statsd-config []
(let [cfg (ziggurat-config)]
(get cfg :statsd (:datadog cfg)))) ;; TODO: remove datadog in the future
(get cfg :statsd (:datadog cfg)))) ;; TODO: remove datadog in the future

(defn get-in-config
([ks]
Expand All @@ -108,3 +110,74 @@
(defn -get [^String key]
(let [config-vals (get config (keyword key))]
(java-response config-vals)))

(def consumer-config-mapping-table
{:auto-offset-reset-config :auto-offset-reset
:commit-interval-ms :auto-commit-interval-ms
:consumer-group-id :group-id
:default-api-timeout-ms-config :default-api-timeout-ms
:key-deserializer-class-config :key-deserializer
:session-timeout-ms-config :session-timeout-ms
:value-deserializer-class-config :value-deserializer})

(def producer-config-mapping-table
{:key-serializer-class :key-serializer
:retries-config :retries
:value-serializer-class :value-serializer})

(def streams-config-mapping-table
{:auto-offset-reset-config :auto-offset-reset
:default-api-timeout-ms-config :default-api-timeout-ms
:changelog-topic-replication-factor :replication-factor
:session-timeout-ms-config :session-timeout-ms
:stream-threads-count :num-stream-threads})

(def ^:private non-kafka-config-keys
[:channels
:consumer-type
:input-topics
:join-cfg
:oldest-processed-message-in-s
:origin-topic
:poll-timeout-ms-config
:producer
:thread-count])

(defn- to-list
[s]
(if (empty? s)
(list)
(list s)))

(defn- to-string-key
[mapping-table k]
(-> (get mapping-table k k)
(name)
(str/replace #"-" ".")))

(defn- normalize-value
[v]
(str/trim
(cond
(keyword? v) (name v)
:else (str v))))

(defn set-property
[mapping-table p k v]
(when-not (some #(= k %) non-kafka-config-keys)
(let [string-key (to-list (to-string-key mapping-table k))
norm-value (to-list (normalize-value v))]
(doseq [sk string-key
nv norm-value]
(.setProperty p sk nv))))
p)

(defn build-properties
[set-property-fn m]
(reduce-kv set-property-fn (Properties.) m))

(def build-consumer-config-properties (partial build-properties (partial set-property consumer-config-mapping-table)))

(def build-producer-config-properties (partial build-properties (partial set-property producer-config-mapping-table)))

(def build-streams-config-properties (partial build-properties (partial set-property streams-config-mapping-table)))
42 changes: 9 additions & 33 deletions src/ziggurat/kafka_consumer/consumer.clj
Original file line number Diff line number Diff line change
@@ -1,48 +1,24 @@
(ns ziggurat.kafka-consumer.consumer
(:require [clojure.tools.logging :as log]
[ziggurat.kafka-consumer.consumer-handler :refer :all]
[ziggurat.config :as cfg]
[ziggurat.util.map :as umap])
(:import (java.util Map Properties)
(org.apache.kafka.clients.consumer KafkaConsumer ConsumerConfig)
(java.util.regex Pattern)))
(:import (java.util.regex Pattern)
(org.apache.kafka.clients.consumer KafkaConsumer)))

(def default-consumer-config
{:commit-interval-ms 15000
:max-poll-records 500
:session-timeout-ms-config 60000
:enable-auto-commit true
:default-api-timeout-ms-config 60000
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
{:commit-interval-ms 15000
:session-timeout-ms-config 60000
:default-api-timeout-ms-config 60000
:key-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"})

(defn- build-consumer-properties-map
[{:keys [bootstrap-servers
consumer-group-id
max-poll-records
session-timeout-ms-config
commit-interval-ms
enable-auto-commit
key-deserializer-class-config
value-deserializer-class-config
default-api-timeout-ms-config]}]
(doto (Properties.)
(.putAll {ConsumerConfig/BOOTSTRAP_SERVERS_CONFIG bootstrap-servers
ConsumerConfig/GROUP_ID_CONFIG consumer-group-id
ConsumerConfig/MAX_POLL_RECORDS_CONFIG (int max-poll-records)
ConsumerConfig/SESSION_TIMEOUT_MS_CONFIG (int session-timeout-ms-config)
ConsumerConfig/ENABLE_AUTO_COMMIT_CONFIG enable-auto-commit
ConsumerConfig/AUTO_COMMIT_INTERVAL_MS_CONFIG (int commit-interval-ms)
ConsumerConfig/KEY_DESERIALIZER_CLASS_CONFIG key-deserializer-class-config
ConsumerConfig/VALUE_DESERIALIZER_CLASS_CONFIG value-deserializer-class-config
ConsumerConfig/DEFAULT_API_TIMEOUT_MS_CONFIG (int default-api-timeout-ms-config)})))
(defn create-consumer
[topic-entity consumer-group-config]
(try
(let [merged-consumer-group-config (umap/deep-merge consumer-group-config default-consumer-config)
consumer (KafkaConsumer. ^Map (build-consumer-properties-map merged-consumer-group-config))
topic-pattern (Pattern/compile (:origin-topic merged-consumer-group-config))]
consumer (KafkaConsumer. (cfg/build-consumer-config-properties merged-consumer-group-config))
topic-pattern (Pattern/compile (:origin-topic merged-consumer-group-config))]
(.subscribe consumer topic-pattern)
consumer)
(catch Exception e
(log/error e "Exception received while creating Kafka Consumer for: " topic-entity))))

103 changes: 16 additions & 87 deletions src/ziggurat/producer.clj
Original file line number Diff line number Diff line change
Expand Up @@ -37,113 +37,42 @@
- key.serializer
- value.serializer
- max.in.flight.requests.per.connection
- enable.idempotencecd
- enable.idempotence
Please see [producer configs](http://kafka.apache.org/documentation.html#producerconfigs)
for a complete list of all producer configs available in Kafka."

(:require [ziggurat.config :refer [ziggurat-config]]
[clojure.tools.logging :as log]
(:refer-clojure :exclude [send])
(:require [clojure.tools.logging :as log]
[mount.core :refer [defstate]]
[camel-snake-kebab.core :as csk]
[ziggurat.config :refer [build-producer-config-properties ziggurat-config]]
[ziggurat.tracer :refer [tracer]]
[ziggurat.util.java-util :refer [get-key]]
[schema.core :as s])
(:import (org.apache.kafka.clients.producer KafkaProducer ProducerRecord ProducerConfig)
(java.util Properties)
(io.opentracing.contrib.kafka TracingKafkaProducer))
[ziggurat.util.java-util :refer [get-key]])
(:import (io.opentracing.contrib.kafka TracingKafkaProducer)
(org.apache.kafka.clients.producer KafkaProducer ProducerRecord))
(:gen-class
:name tech.gojek.ziggurat.internal.Producer
:methods [^{:static true} [send [String String Object Object] java.util.concurrent.Future]
^{:static true} [send [String String int Object Object] java.util.concurrent.Future]]))

(defn *implements-serializer?* [serializer-class]
(contains? (set (.getInterfaces (Class/forName serializer-class)))
(Class/forName "org.apache.kafka.common.serialization.Serializer")))

(def implements-serializer? (s/pred *implements-serializer?* 'implements-serializer?))

(s/defschema ProducerConfigSchema {(s/required-key :bootstrap-servers) s/Any
(s/optional-key :key-serializer-class) implements-serializer?
(s/optional-key :value-serializer-class) implements-serializer?
(s/optional-key :key-serializer) implements-serializer?
(s/optional-key :value-serializer) implements-serializer?
(s/optional-key :retries-config) s/Any
(s/optional-key :metadata-max-age) s/Any
(s/optional-key :reconnect-backoff-ms) s/Any
(s/optional-key :client-id) s/Any
(s/optional-key :metric-num-samples) s/Any
(s/optional-key :transaction-timeout) s/Any
(s/optional-key :retries) s/Any
(s/optional-key :retry-backoff-ms) s/Any
(s/optional-key :receive-buffer) s/Any
(s/optional-key :partitioner-class) s/Any
(s/optional-key :max-block-ms) s/Any
(s/optional-key :metrics-reporter-classes) s/Any
(s/optional-key :compression-type) s/Any
(s/optional-key :max-request-size) s/Any
(s/optional-key :delivery-timeout-ms) s/Any
(s/optional-key :metrics-sample-window-ms) s/Any
(s/optional-key :request-timeout-ms) s/Any
(s/optional-key :buffer-memory) s/Any
(s/optional-key :interceptor-classes) s/Any
(s/optional-key :linger-ms) s/Any
(s/optional-key :connections-max-idle-ms) s/Any
(s/optional-key :acks) s/Any
(s/optional-key :enable-idempotence) s/Any
(s/optional-key :metrics-recording-level) s/Any
(s/optional-key :transactional-id) s/Any
(s/optional-key :reconnect-backoff-max-ms) s/Any
(s/optional-key :client-dns-lookup) s/Any
(s/optional-key :max-in-flight-requests-per-connection) s/Any
(s/optional-key :send-buffer) s/Any
(s/optional-key :batch-size) s/Any})

(def valid-configs? (partial s/validate ProducerConfigSchema))

(def explain-str (partial s/explain ProducerConfigSchema))

(defn property->fn [field-name]
(let [raw-field-name (condp = field-name
:max-in-flight-requests-per-connection "org.apache.kafka.clients.producer.ProducerConfig/%s"
:retries-config "org.apache.kafka.clients.producer.ProducerConfig/RETRIES_CONFIG"
:key-serializer "org.apache.kafka.clients.producer.ProducerConfig/KEY_SERIALIZER_CLASS_CONFIG"
:value-serializer "org.apache.kafka.clients.producer.ProducerConfig/VALUE_SERIALIZER_CLASS_CONFIG"
"org.apache.kafka.clients.producer.ProducerConfig/%s_CONFIG")]
(->> field-name
csk/->SCREAMING_SNAKE_CASE_STRING
(format raw-field-name)
(read-string))))

(defn producer-properties [config-map]
(if (valid-configs? config-map)
(let [props (Properties.)]
(doseq [config-key (keys config-map)]
(.setProperty props
(eval (property->fn config-key))
(str (get config-map config-key))))
props)
(throw (ex-info (explain-str config-map) config-map))))
^{:static true} [send [String String int Object Object] java.util.concurrent.Future]]
:name tech.gojek.ziggurat.internal.Producer))

(defn producer-properties-map []
(reduce (fn [producer-map [stream-config-key stream-config]]
(let [producer-config (:producer stream-config)]
(if (some? producer-config)
(assoc producer-map stream-config-key (producer-properties producer-config))
(assoc producer-map stream-config-key (build-producer-config-properties producer-config))
producer-map)))
{}
(seq (:stream-router (ziggurat-config)))))

(declare kafka-producers)

(defstate kafka-producers
:start (if (not-empty (producer-properties-map))
(do (log/info "Starting Kafka producers ...")
(reduce (fn [producers [stream-config-key properties]]
(do (log/debug "Constructing Kafka producer associated with [" stream-config-key "] ")
(let [_ (println properties)
kp (KafkaProducer. properties)
_ (println kp)
tkp (TracingKafkaProducer. kp tracer)]
(assoc producers stream-config-key tkp))))
(log/debug "Constructing Kafka producer associated with [" stream-config-key "] ")
(let [kp (KafkaProducer. properties)
tkp (TracingKafkaProducer. kp tracer)]
(assoc producers stream-config-key tkp)))
{}
(seq (producer-properties-map))))
(log/info "No producers found. Can not initiate start."))
Expand Down
Loading

0 comments on commit 1ccb2f0

Please sign in to comment.