Skip to content

Commit

Permalink
Structured logging with ziggurat (#238)
Browse files Browse the repository at this point in the history
* [Anmol|Shubhang] Add Structured logging capability

* Add test for setting decoder when using json format

* Remove log4j.test.xml

* Removed unused code

* Add mdc to text log

* Excluding cambium macros from coverage

* Quoting the cambium libs in cloverage

* Add default log format

* Change log level to error

* Handling decreased line coverage

* changes log formats and removes timezone block

* moves set-properties-for-structured-logging inside initialize-config

* Organize imports

* Structuring ziggurat logs  (#239)

* Add structured logs to ziggurat logs

* Fix linting

* changes log formats and removes timezone block

* adds logging context to mapper flow and rabbitmq consume flow

* Resolve PR comments

Co-authored-by: Anmol Vijaywargiya <[email protected]>
Co-authored-by: shubhang.balkundi <[email protected]>

* Updated changelog and project version for 4.2.0

Co-authored-by: Anmol Vijaywargiya <[email protected]>
Co-authored-by: shubhang.balkundi <[email protected]>
  • Loading branch information
3 people authored Sep 3, 2021
1 parent c8343fb commit baaee63
Show file tree
Hide file tree
Showing 15 changed files with 191 additions and 127 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/).

## 4.2.0
- Enabled structured logging via [cambium](https://cambium-clojure.github.io/)
- Replaced log4j with logback as the default slf4j implementation.
- Added structured logs in ziggurat codebase as well.

## 4.1.0
- Changed the code for consuming from RabbitMQ - on exception during de-serialization, message is sent to the dead-set
queues and NOT re-queued back in the queue like previous versions.
Expand Down
24 changes: 18 additions & 6 deletions project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))

(defproject tech.gojek/ziggurat "4.1.0"
(defproject tech.gojek/ziggurat "4.2.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
Expand Down Expand Up @@ -30,8 +30,6 @@
[io.opentracing.contrib/opentracing-rabbitmq-client "0.1.11" :exclusions [com.rabbitmq/amqp-client]]
[org.apache.httpcomponents/fluent-hc "4.5.13"]
[org.apache.kafka/kafka-streams "2.8.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.logging.log4j/log4j-core "2.14.1"]
[org.apache.logging.log4j/log4j-slf4j-impl "2.14.1"]
[org.clojure/clojure "1.10.3"]
[org.clojure/tools.logging "1.1.0"]
[nrepl/nrepl "0.8.3"]
Expand All @@ -53,7 +51,14 @@
com.fasterxml.jackson.core/jackson-core
com.fasterxml.jackson.dataformat/jackson-dataformat-smile
com.fasterxml.jackson.dataformat/jackson-dataformat-cbor]]
[metosin/ring-swagger-ui "3.46.0"]]
[metosin/ring-swagger-ui "3.46.0"]
[cambium/cambium.core "1.1.0"]
[cambium/cambium.codec-cheshire "1.0.0"]
[cambium/cambium.logback.json "0.4.4"]
[ch.qos.logback/logback-classic "1.2.3"]
[ch.qos.logback.contrib/logback-json-classic "0.1.5"]
[ch.qos.logback.contrib/logback-jackson "0.1.5"]
[net.logstash.logback/logstash-logback-encoder "6.6"]]
:deploy-repositories [["clojars" {:url "https://clojars.org/repo"
:username :env/clojars_username
:password :env/clojars_password
Expand All @@ -67,14 +72,21 @@
:global-vars {*warn-on-reflection* true}
:pedantic? :abort}
:test {:java-source-paths ["src/com" "test/com"]
:jvm-opts ["-Dlog4j.configurationFile=resources/log4j2.test.xml"]
:jvm-opts ["-Dlogback.configurationFile=resources/logback.test.xml"]
:dependencies [[com.google.protobuf/protobuf-java "3.17.0"]
[junit/junit "4.13.2"]
[org.hamcrest/hamcrest-core "2.2"]
[org.apache.kafka/kafka-streams "2.8.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.8.0" :classifier "test"]
[org.clojure/test.check "1.1.0"]]
:plugins [[lein-cloverage "1.0.13" :exclusions [org.clojure/clojure]]]
:plugins [[lein-cloverage "1.2.2" :exclusions [org.clojure/clojure]]]
:cloverage {:exclude-call ['cambium.core/info
'cambium.core/debug
'cambium.core/trace
'cambium.core/warn
'cambium.core/error
'cambium.core/fatal
'cambium.core/with-logging-context]}
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[lein-ancient "0.6.15"]
[lein-cljfmt "0.6.4"]
Expand Down
3 changes: 2 additions & 1 deletion resources/config.test.edn
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}}
:tracer {:enabled [true :bool]
:custom-provider ""}
:new-relic {:report-errors false}}}
:new-relic {:report-errors false}
:log-format "text"}}
21 changes: 0 additions & 21 deletions resources/log4j2.test.xml

This file was deleted.

35 changes: 35 additions & 0 deletions resources/logback.test.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<configuration>
<property name="type" value="${ZIGGURAT_LOG_FORMAT:-text}"/>

<if condition='property("type").equals("json")'>
<then>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="cambium.logback.json.FlatJsonLayout">
<jsonFormatter class="ch.qos.logback.contrib.jackson.JacksonJsonFormatter">
<prettyPrint>false</prettyPrint>
</jsonFormatter>
<timestampFormat>yyyy-MM-dd'T'HH:mm:ss.SSS'Z'</timestampFormat>
<appendLineSeparator>true</appendLineSeparator>
</layout>
</encoder>
</appender>
</then>
</if>

<if condition='property("type").equals("text")'>
<then>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>
[%-5level] %d [%t] %c:%M: %m { %mdc }%n
</pattern>
</layout>
</appender>
</then>
</if>

<root level="error">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
3 changes: 2 additions & 1 deletion src/ziggurat/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
:http-server {:middlewares {:swagger {:enabled false}}
:port 8080
:thread-count 100}
:new-relic {:report-errors false}}})
:new-relic {:report-errors false}
:log-format "text"}})

(defn- interpolate-val [val app-name]
(if (string? val)
Expand Down
21 changes: 14 additions & 7 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
(ns ziggurat.init
"Contains the entry point for your application."
(:require [clojure.tools.logging :as log]
(:require [cambium.codec :as codec]
[cambium.logback.json.flat-layout :as flat]
[clojure.set :as set]
[clojure.tools.logging :as log]
[mount.core :as mount :refer [defstate]]
[schema.core :as s]
[clojure.set :as set]
[ziggurat.config :refer [ziggurat-config] :as config]
[ziggurat.messaging.connection :as messaging-connection :refer [connection]]
[ziggurat.messaging.consumer :as messaging-consumer]
Expand Down Expand Up @@ -44,6 +46,10 @@
(start-rabbitmq-connection args)
(messaging-producer/make-queues (event-routes args)))

(defn- set-properties-for-structured-logging []
(if (= (:log-format (ziggurat-config)) "json")
(flat/set-decoder! codec/destringify-val)))

(defn start-kafka-producers []
(start* #{#'kafka-producers}))

Expand Down Expand Up @@ -130,7 +136,8 @@
((fnk (get valid-modes-fns mode)) args)))))

(defn initialize-config []
(start* #{#'config/config}))
(start* #{#'config/config})
(set-properties-for-structured-logging))

(defn start-common-states []
(start* #{#'metrics/statsd-reporter
Expand Down Expand Up @@ -188,9 +195,9 @@
(defn- validate-routes-against-config
([routes route-type]
(doseq [[topic-entity handler-map] routes]
(let [route-config (-> (ziggurat-config)
(get-in [route-type topic-entity]))
channels (-> handler-map (dissoc :handler-fn) (keys) (set))
(let [route-config (-> (ziggurat-config)
(get-in [route-type topic-entity]))
channels (-> handler-map (dissoc :handler-fn) (keys) (set))
config-channels (-> (ziggurat-config)
(get-in [route-type topic-entity :channels])
(keys)
Expand All @@ -210,7 +217,7 @@
(validate-routes-against-config batch-routes :batch-routes)))

(defn- derive-modes [stream-routes batch-routes actor-routes]
(let [base-modes [:management-api :worker]]
(let [base-modes [:management-api :worker]]
(if (and (nil? stream-routes) (nil? batch-routes))
(throw (IllegalArgumentException. "Either :stream-routes or :batch-routes should be present in init args")))
(cond-> base-modes
Expand Down
6 changes: 4 additions & 2 deletions src/ziggurat/kafka_consumer/consumer_driver.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
[clojure.tools.logging :as log]
[ziggurat.kafka-consumer.executor-service :refer :all]
[mount.core :as mount]
[ziggurat.metrics :as metrics])
[ziggurat.metrics :as metrics]
[cambium.core :as clog])
(:import (java.util.concurrent ExecutorService RejectedExecutionException)
(org.apache.kafka.clients.consumer Consumer)))

(defn- start-polling-with-consumer
[consumer init-arg topic-entity consumer-config]

(let [message-poller (cast Runnable #(ch/poll-for-messages consumer (:handler-fn init-arg) topic-entity consumer-config))]
(when message-poller
(try
Expand Down Expand Up @@ -43,7 +45,7 @@
(defn- stop-consumers [consumer-groups]
(do (log/info "stopping consumers")
(doseq [[topic-entity consumers] consumer-groups]
(log/info "Stopping threads for consumer group: " topic-entity)
(clog/info {:consumer-group topic-entity} (str "Stopping threads for consumer group: " topic-entity))
(doseq [consumer consumers]
(.wakeup ^Consumer consumer)))))

Expand Down
44 changes: 23 additions & 21 deletions src/ziggurat/kafka_consumer/consumer_handler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
[ziggurat.config :refer :all]
[ziggurat.messaging.producer :as producer]
[ziggurat.message-payload :refer [map->MessagePayload]]
[ziggurat.metrics :as metrics])
[ziggurat.metrics :as metrics]
[cambium.core :as clog])
(:import (org.apache.kafka.common.errors WakeupException)
(java.time Duration Instant)
(tech.gojek.ziggurat.internal InvalidReturnTypeException)
Expand All @@ -26,8 +27,8 @@
(producer/retry batch-payload))
([batch current-retry-count topic-entity]
(when (pos? (count batch))
(let [message (map->MessagePayload {:message batch
:retry-count current-retry-count
(let [message (map->MessagePayload {:message batch
:retry-count current-retry-count
:topic-entity topic-entity})]
(producer/retry message)))))

Expand All @@ -49,17 +50,17 @@
batch-size (count batch)]
(try
(when (not-empty batch)
(log/infof "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size)
(let [start-time (Instant/now)
result (batch-handler batch)
time-taken-in-millis (.toMillis (Duration/between start-time (Instant/now)))]
(clog/info {:batch-size batch-size} (format "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size))
(let [start-time (Instant/now)
result (batch-handler batch)
time-taken-in-millis (.toMillis (Duration/between start-time (Instant/now)))]
(validate-batch-processing-result result)
(let [messages-to-be-retried (:retry result)
to-be-retried-count (count messages-to-be-retried)
skip-count (count (:skip result))
success-count (- batch-size (+ to-be-retried-count skip-count))]
(log/infof "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n"
topic-entity success-count skip-count to-be-retried-count)

(clog/info {:messages-successfully-processed success-count :messages-skipped skip-count :messages-to-be-retried to-be-retried-count} (format "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n" topic-entity success-count skip-count to-be-retried-count))
(publish-batch-process-metrics topic-entity batch-size success-count skip-count to-be-retried-count time-taken-in-millis)
(retry messages-to-be-retried current-retry-count topic-entity))))
(catch InvalidReturnTypeException e
Expand All @@ -78,16 +79,17 @@

(defn poll-for-messages
[^Consumer consumer handler-fn topic-entity consumer-config]
(try
(loop [records []]
(when (not-empty records)
(let [batch-payload (create-batch-payload records topic-entity)]
(process handler-fn batch-payload)))
(recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
(catch WakeupException e
(log/errorf e "WakeupException while polling for messages for: %s" topic-entity))
(catch Exception e
(log/errorf e "Exception while polling for messages for: %s" topic-entity))
(finally (do (log/info "Closing the Kafka Consumer for: " topic-entity)
(.close consumer)))))
(clog/with-logging-context {:consumer-group topic-entity}
(try
(loop [records []]
(when (not-empty records)
(let [batch-payload (create-batch-payload records topic-entity)]
(process handler-fn batch-payload)))
(recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
(catch WakeupException e
(log/errorf e "WakeupException while polling for messages for: %s" topic-entity))
(catch Exception e
(log/errorf e "Exception while polling for messages for: %s" topic-entity))
(finally (do (log/info "Closing the Kafka Consumer for: " topic-entity)
(.close consumer))))))

Loading

0 comments on commit baaee63

Please sign in to comment.