diff --git a/CHANGELOG.md b/CHANGELOG.md
index f8a59012..acd7d483 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,11 @@
All notable changes to this project will be documented in this file. This change log follows the conventions of [keepachangelog.com](http://keepachangelog.com/).
+## 4.2.0
+- Enabled structured logging via [cambium](https://cambium-clojure.github.io/)
+- Replaced log4j with logback as the default slf4j implementation.
+- Added structured logs in ziggurat codebase as well.
+
## 4.1.0
- Changed the code for consuming from RabbitMQ - on exception during de-serialization, message is sent to the dead-set
queues and NOT re-queued back in the queue like previous versions.
diff --git a/project.clj b/project.clj
index 8897dad6..b49f467b 100644
--- a/project.clj
+++ b/project.clj
@@ -2,7 +2,7 @@
(cemerick.pomegranate.aether/register-wagon-factory!
"http" #(org.apache.maven.wagon.providers.http.HttpWagon.))
-(defproject tech.gojek/ziggurat "4.1.0"
+(defproject tech.gojek/ziggurat "4.2.0"
:description "A stream processing framework to build stateless applications on kafka"
:url "https://github.com/gojektech/ziggurat"
:license {:name "Apache License, Version 2.0"
@@ -30,8 +30,6 @@
[io.opentracing.contrib/opentracing-rabbitmq-client "0.1.11" :exclusions [com.rabbitmq/amqp-client]]
[org.apache.httpcomponents/fluent-hc "4.5.13"]
[org.apache.kafka/kafka-streams "2.8.0" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
- [org.apache.logging.log4j/log4j-core "2.14.1"]
- [org.apache.logging.log4j/log4j-slf4j-impl "2.14.1"]
[org.clojure/clojure "1.10.3"]
[org.clojure/tools.logging "1.1.0"]
[nrepl/nrepl "0.8.3"]
@@ -53,7 +51,14 @@
com.fasterxml.jackson.core/jackson-core
com.fasterxml.jackson.dataformat/jackson-dataformat-smile
com.fasterxml.jackson.dataformat/jackson-dataformat-cbor]]
- [metosin/ring-swagger-ui "3.46.0"]]
+ [metosin/ring-swagger-ui "3.46.0"]
+ [cambium/cambium.core "1.1.0"]
+ [cambium/cambium.codec-cheshire "1.0.0"]
+ [cambium/cambium.logback.json "0.4.4"]
+ [ch.qos.logback/logback-classic "1.2.3"]
+ [ch.qos.logback.contrib/logback-json-classic "0.1.5"]
+ [ch.qos.logback.contrib/logback-jackson "0.1.5"]
+ [net.logstash.logback/logstash-logback-encoder "6.6"]]
:deploy-repositories [["clojars" {:url "https://clojars.org/repo"
:username :env/clojars_username
:password :env/clojars_password
@@ -67,14 +72,21 @@
:global-vars {*warn-on-reflection* true}
:pedantic? :abort}
:test {:java-source-paths ["src/com" "test/com"]
- :jvm-opts ["-Dlog4j.configurationFile=resources/log4j2.test.xml"]
+ :jvm-opts ["-Dlogback.configurationFile=resources/logback.test.xml"]
:dependencies [[com.google.protobuf/protobuf-java "3.17.0"]
[junit/junit "4.13.2"]
[org.hamcrest/hamcrest-core "2.2"]
[org.apache.kafka/kafka-streams "2.8.0" :classifier "test" :exclusions [org.slf4j/slf4j-log4j12 log4j]]
[org.apache.kafka/kafka-clients "2.8.0" :classifier "test"]
[org.clojure/test.check "1.1.0"]]
- :plugins [[lein-cloverage "1.0.13" :exclusions [org.clojure/clojure]]]
+ :plugins [[lein-cloverage "1.2.2" :exclusions [org.clojure/clojure]]]
+ :cloverage {:exclude-call ['cambium.core/info
+ 'cambium.core/debug
+ 'cambium.core/trace
+ 'cambium.core/warn
+ 'cambium.core/error
+ 'cambium.core/fatal
+ 'cambium.core/with-logging-context]}
:repositories [["confluent-repo" "https://packages.confluent.io/maven/"]]}
:dev {:plugins [[lein-ancient "0.6.15"]
[lein-cljfmt "0.6.4"]
diff --git a/resources/config.test.edn b/resources/config.test.edn
index 1b476b22..aa62fe8d 100644
--- a/resources/config.test.edn
+++ b/resources/config.test.edn
@@ -85,4 +85,5 @@
:value-deserializer-class-config "org.apache.kafka.common.serialization.ByteArrayDeserializer"}}
:tracer {:enabled [true :bool]
:custom-provider ""}
- :new-relic {:report-errors false}}}
+ :new-relic {:report-errors false}
+ :log-format "text"}}
diff --git a/resources/log4j2.test.xml b/resources/log4j2.test.xml
deleted file mode 100644
index 9d8c6a86..00000000
--- a/resources/log4j2.test.xml
+++ /dev/null
@@ -1,21 +0,0 @@
-
-
-
-
- [%-5level] %d [%t] %c:%M: %m%n
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
diff --git a/resources/logback.test.xml b/resources/logback.test.xml
new file mode 100644
index 00000000..dabd5dad
--- /dev/null
+++ b/resources/logback.test.xml
@@ -0,0 +1,35 @@
+
+
+
+
+
+
+
+
+
+ false
+
+ yyyy-MM-dd'T'HH:mm:ss.SSS'Z'
+ true
+
+
+
+
+
+
+
+
+
+
+
+ [%-5level] %d [%t] %c:%M: %m { %mdc }%n
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/ziggurat/config.clj b/src/ziggurat/config.clj
index 3f479304..e1f00e0d 100644
--- a/src/ziggurat/config.clj
+++ b/src/ziggurat/config.clj
@@ -39,7 +39,8 @@
:http-server {:middlewares {:swagger {:enabled false}}
:port 8080
:thread-count 100}
- :new-relic {:report-errors false}}})
+ :new-relic {:report-errors false}
+ :log-format "text"}})
(defn- interpolate-val [val app-name]
(if (string? val)
diff --git a/src/ziggurat/init.clj b/src/ziggurat/init.clj
index f49a5c3b..d1cb4151 100644
--- a/src/ziggurat/init.clj
+++ b/src/ziggurat/init.clj
@@ -1,9 +1,11 @@
(ns ziggurat.init
"Contains the entry point for your application."
- (:require [clojure.tools.logging :as log]
+ (:require [cambium.codec :as codec]
+ [cambium.logback.json.flat-layout :as flat]
+ [clojure.set :as set]
+ [clojure.tools.logging :as log]
[mount.core :as mount :refer [defstate]]
[schema.core :as s]
- [clojure.set :as set]
[ziggurat.config :refer [ziggurat-config] :as config]
[ziggurat.messaging.connection :as messaging-connection :refer [connection]]
[ziggurat.messaging.consumer :as messaging-consumer]
@@ -44,6 +46,10 @@
(start-rabbitmq-connection args)
(messaging-producer/make-queues (event-routes args)))
+(defn- set-properties-for-structured-logging []
+ (if (= (:log-format (ziggurat-config)) "json")
+ (flat/set-decoder! codec/destringify-val)))
+
(defn start-kafka-producers []
(start* #{#'kafka-producers}))
@@ -130,7 +136,8 @@
((fnk (get valid-modes-fns mode)) args)))))
(defn initialize-config []
- (start* #{#'config/config}))
+ (start* #{#'config/config})
+ (set-properties-for-structured-logging))
(defn start-common-states []
(start* #{#'metrics/statsd-reporter
@@ -188,9 +195,9 @@
(defn- validate-routes-against-config
([routes route-type]
(doseq [[topic-entity handler-map] routes]
- (let [route-config (-> (ziggurat-config)
- (get-in [route-type topic-entity]))
- channels (-> handler-map (dissoc :handler-fn) (keys) (set))
+ (let [route-config (-> (ziggurat-config)
+ (get-in [route-type topic-entity]))
+ channels (-> handler-map (dissoc :handler-fn) (keys) (set))
config-channels (-> (ziggurat-config)
(get-in [route-type topic-entity :channels])
(keys)
@@ -210,7 +217,7 @@
(validate-routes-against-config batch-routes :batch-routes)))
(defn- derive-modes [stream-routes batch-routes actor-routes]
- (let [base-modes [:management-api :worker]]
+ (let [base-modes [:management-api :worker]]
(if (and (nil? stream-routes) (nil? batch-routes))
(throw (IllegalArgumentException. "Either :stream-routes or :batch-routes should be present in init args")))
(cond-> base-modes
diff --git a/src/ziggurat/kafka_consumer/consumer_driver.clj b/src/ziggurat/kafka_consumer/consumer_driver.clj
index 445fcf9e..78037fb5 100644
--- a/src/ziggurat/kafka_consumer/consumer_driver.clj
+++ b/src/ziggurat/kafka_consumer/consumer_driver.clj
@@ -6,12 +6,14 @@
[clojure.tools.logging :as log]
[ziggurat.kafka-consumer.executor-service :refer :all]
[mount.core :as mount]
- [ziggurat.metrics :as metrics])
+ [ziggurat.metrics :as metrics]
+ [cambium.core :as clog])
(:import (java.util.concurrent ExecutorService RejectedExecutionException)
(org.apache.kafka.clients.consumer Consumer)))
(defn- start-polling-with-consumer
[consumer init-arg topic-entity consumer-config]
+
(let [message-poller (cast Runnable #(ch/poll-for-messages consumer (:handler-fn init-arg) topic-entity consumer-config))]
(when message-poller
(try
@@ -43,7 +45,7 @@
(defn- stop-consumers [consumer-groups]
(do (log/info "stopping consumers")
(doseq [[topic-entity consumers] consumer-groups]
- (log/info "Stopping threads for consumer group: " topic-entity)
+ (clog/info {:consumer-group topic-entity} (str "Stopping threads for consumer group: " topic-entity))
(doseq [consumer consumers]
(.wakeup ^Consumer consumer)))))
diff --git a/src/ziggurat/kafka_consumer/consumer_handler.clj b/src/ziggurat/kafka_consumer/consumer_handler.clj
index 35c45ed1..752e543b 100644
--- a/src/ziggurat/kafka_consumer/consumer_handler.clj
+++ b/src/ziggurat/kafka_consumer/consumer_handler.clj
@@ -3,7 +3,8 @@
[ziggurat.config :refer :all]
[ziggurat.messaging.producer :as producer]
[ziggurat.message-payload :refer [map->MessagePayload]]
- [ziggurat.metrics :as metrics])
+ [ziggurat.metrics :as metrics]
+ [cambium.core :as clog])
(:import (org.apache.kafka.common.errors WakeupException)
(java.time Duration Instant)
(tech.gojek.ziggurat.internal InvalidReturnTypeException)
@@ -26,8 +27,8 @@
(producer/retry batch-payload))
([batch current-retry-count topic-entity]
(when (pos? (count batch))
- (let [message (map->MessagePayload {:message batch
- :retry-count current-retry-count
+ (let [message (map->MessagePayload {:message batch
+ :retry-count current-retry-count
:topic-entity topic-entity})]
(producer/retry message)))))
@@ -49,17 +50,17 @@
batch-size (count batch)]
(try
(when (not-empty batch)
- (log/infof "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size)
- (let [start-time (Instant/now)
- result (batch-handler batch)
- time-taken-in-millis (.toMillis (Duration/between start-time (Instant/now)))]
+ (clog/info {:batch-size batch-size} (format "[Consumer Group: %s] Processing the batch with %d messages" topic-entity batch-size))
+ (let [start-time (Instant/now)
+ result (batch-handler batch)
+ time-taken-in-millis (.toMillis (Duration/between start-time (Instant/now)))]
(validate-batch-processing-result result)
(let [messages-to-be-retried (:retry result)
to-be-retried-count (count messages-to-be-retried)
skip-count (count (:skip result))
success-count (- batch-size (+ to-be-retried-count skip-count))]
- (log/infof "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n"
- topic-entity success-count skip-count to-be-retried-count)
+
+ (clog/info {:messages-successfully-processed success-count :messages-skipped skip-count :messages-to-be-retried to-be-retried-count} (format "[Consumer Group: %s] Processed the batch with success: [%d], skip: [%d] and retries: [%d] \n" topic-entity success-count skip-count to-be-retried-count))
(publish-batch-process-metrics topic-entity batch-size success-count skip-count to-be-retried-count time-taken-in-millis)
(retry messages-to-be-retried current-retry-count topic-entity))))
(catch InvalidReturnTypeException e
@@ -78,16 +79,17 @@
(defn poll-for-messages
[^Consumer consumer handler-fn topic-entity consumer-config]
- (try
- (loop [records []]
- (when (not-empty records)
- (let [batch-payload (create-batch-payload records topic-entity)]
- (process handler-fn batch-payload)))
- (recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
- (catch WakeupException e
- (log/errorf e "WakeupException while polling for messages for: %s" topic-entity))
- (catch Exception e
- (log/errorf e "Exception while polling for messages for: %s" topic-entity))
- (finally (do (log/info "Closing the Kafka Consumer for: " topic-entity)
- (.close consumer)))))
+ (clog/with-logging-context {:consumer-group topic-entity}
+ (try
+ (loop [records []]
+ (when (not-empty records)
+ (let [batch-payload (create-batch-payload records topic-entity)]
+ (process handler-fn batch-payload)))
+ (recur (seq (.poll consumer (Duration/ofMillis (or (:poll-timeout-ms-config consumer-config) DEFAULT_POLL_TIMEOUT_MS_CONFIG))))))
+ (catch WakeupException e
+ (log/errorf e "WakeupException while polling for messages for: %s" topic-entity))
+ (catch Exception e
+ (log/errorf e "Exception while polling for messages for: %s" topic-entity))
+ (finally (do (log/info "Closing the Kafka Consumer for: " topic-entity)
+ (.close consumer))))))
diff --git a/src/ziggurat/mapper.clj b/src/ziggurat/mapper.clj
index e720198f..28f5559d 100644
--- a/src/ziggurat/mapper.clj
+++ b/src/ziggurat/mapper.clj
@@ -6,7 +6,8 @@
[ziggurat.metrics :as metrics]
[ziggurat.new-relic :as nr]
[ziggurat.sentry :refer [sentry-reporter]]
- [ziggurat.util.error :refer [report-error]])
+ [ziggurat.util.error :refer [report-error]]
+ [cambium.core :as clog])
(:import (java.time Instant)))
(defn- send-msg-to-channel [channels message-payload return-code]
@@ -36,29 +37,30 @@
failure-metric "failure"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload)]
- (nr/with-tracing "job" new-relic-transaction-name
- (try
- (let [start-time (.toEpochMilli (Instant/now))
- return-code (user-handler-fn user-payload)
- end-time (.toEpochMilli (Instant/now))
- time-val (- end-time start-time)
- execution-time-namespace "handler-fn-execution-time"
- multi-execution-time-namespaces [(conj base-metric-namespaces execution-time-namespace)
- [execution-time-namespace]]]
- (metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags)
- (case return-code
- :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
- :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
+ (clog/with-logging-context {:consumer-group topic-entity-name}
+ (nr/with-tracing "job" new-relic-transaction-name
+ (try
+ (let [start-time (.toEpochMilli (Instant/now))
+ return-code (user-handler-fn user-payload)
+ end-time (.toEpochMilli (Instant/now))
+ time-val (- end-time start-time)
+ execution-time-namespace "handler-fn-execution-time"
+ multi-execution-time-namespaces [(conj base-metric-namespaces execution-time-namespace)
+ [execution-time-namespace]]]
+ (metrics/multi-ns-report-histogram multi-execution-time-namespaces time-val additional-tags)
+ (case return-code
+ :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
+ :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry message-payload))
- :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
- :block 'TODO
- (do
- (send-msg-to-channel channels message-payload return-code)
- (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags))))
- (catch Throwable e
- (producer/retry message-payload)
- (report-error e (str "Actor execution failed for " topic-entity-name))
- (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))
+ :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
+ :block 'TODO
+ (do
+ (send-msg-to-channel channels message-payload return-code)
+ (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags))))
+ (catch Throwable e
+ (producer/retry message-payload)
+ (report-error e (str "Actor execution failed for " topic-entity-name))
+ (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags))))))))
(defn channel-mapper-func [user-handler-fn channel]
(fn [{:keys [topic-entity] :as message-payload}]
@@ -76,26 +78,27 @@
failure-metric "failure"
multi-message-processing-namespaces [message-processing-namespaces [message-processing-namespace]]
user-payload (create-user-payload message-payload)]
- (nr/with-tracing "job" metric-namespace
- (try
- (let [start-time (.toEpochMilli (Instant/now))
- return-code (user-handler-fn user-payload)
- end-time (.toEpochMilli (Instant/now))
- time-val (- end-time start-time)
- execution-time-namespace "execution-time"
- multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace)
- [execution-time-namespace]]]
- (metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags)
- (case return-code
- :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
- :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
+ (clog/with-logging-context {:consumer-group topic-entity-name :channel channel-name}
+ (nr/with-tracing "job" metric-namespace
+ (try
+ (let [start-time (.toEpochMilli (Instant/now))
+ return-code (user-handler-fn user-payload)
+ end-time (.toEpochMilli (Instant/now))
+ time-val (- end-time start-time)
+ execution-time-namespace "execution-time"
+ multi-execution-time-namespace [(conj base-metric-namespaces execution-time-namespace)
+ [execution-time-namespace]]]
+ (metrics/multi-ns-report-histogram multi-execution-time-namespace time-val additional-tags)
+ (case return-code
+ :success (metrics/multi-ns-increment-count multi-message-processing-namespaces success-metric additional-tags)
+ :retry (do (metrics/multi-ns-increment-count multi-message-processing-namespaces retry-metric additional-tags)
(producer/retry-for-channel message-payload channel))
- :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
- :block 'TODO
- (throw (ex-info "Invalid mapper return code" {:code return-code}))))
- (catch Throwable e
- (producer/retry-for-channel message-payload channel)
- (report-error e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
- (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags)))))))
+ :skip (metrics/multi-ns-increment-count multi-message-processing-namespaces skip-metric additional-tags)
+ :block 'TODO
+ (throw (ex-info "Invalid mapper return code" {:code return-code}))))
+ (catch Throwable e
+ (producer/retry-for-channel message-payload channel)
+ (report-error e (str "Channel execution failed for " topic-entity-name " and for channel " channel-name))
+ (metrics/multi-ns-increment-count multi-message-processing-namespaces failure-metric additional-tags))))))))
(defrecord MessagePayload [message topic-entity])
diff --git a/src/ziggurat/messaging/consumer.clj b/src/ziggurat/messaging/consumer.clj
index 47c92677..d843a35c 100644
--- a/src/ziggurat/messaging/consumer.clj
+++ b/src/ziggurat/messaging/consumer.clj
@@ -10,7 +10,8 @@
[ziggurat.messaging.connection :refer [connection]]
[ziggurat.messaging.util :as util]
[ziggurat.metrics :as metrics]
- [ziggurat.util.error :refer [report-error]]))
+ [ziggurat.util.error :refer [report-error]]
+ [cambium.core :as clog]))
(defn- reject-message
[ch delivery-tag]
@@ -103,7 +104,7 @@
(defn- message-handler [wrapped-mapper-fn topic-entity]
(fn [ch meta ^bytes payload]
- (process-message-from-queue ch meta payload topic-entity wrapped-mapper-fn)))
+ (clog/with-logging-context {:consumer-group topic-entity} (process-message-from-queue ch meta payload topic-entity wrapped-mapper-fn))))
(defn- start-subscriber* [ch prefetch-count queue-name wrapped-mapper-fn topic-entity]
(lb/qos ch prefetch-count)
diff --git a/src/ziggurat/streams.clj b/src/ziggurat/streams.clj
index b2f6ca30..f79006af 100644
--- a/src/ziggurat/streams.clj
+++ b/src/ziggurat/streams.clj
@@ -9,7 +9,8 @@
[ziggurat.metrics :as metrics]
[ziggurat.timestamp-transformer :as timestamp-transformer]
[ziggurat.tracer :refer [tracer]]
- [ziggurat.util.map :as umap])
+ [ziggurat.util.map :as umap]
+ [cambium.core :as clog])
(:import [io.opentracing.contrib.kafka TracingKafkaUtils]
[io.opentracing.contrib.kafka.streams TracingKafkaClientSupplier]
[io.opentracing.tag Tags]
@@ -112,8 +113,7 @@
stream-state))
(do
(.close stream)
- (log/info
- (str "Stopping Kafka stream with topic-entity " topic-entity))))))
+ (log/info (str "Stopping Kafka stream with topic-entity " topic-entity))))))
(defn stop-stream [topic-entity]
(let [stream (get stream topic-entity)]
@@ -220,8 +220,7 @@
(defn handle-uncaught-exception
[stream-thread-exception-response ^Throwable error]
- (log/infof "Ziggurat Streams Uncaught Exception Handler Invoked: [%s]"
- (.getMessage error))
+ (log/infof "Ziggurat Streams Uncaught Exception Handler Invoked: [%s]" (.getMessage error))
(case stream-thread-exception-response
:shutdown-application StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse/SHUTDOWN_APPLICATION
:replace-thread StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse/REPLACE_THREAD
@@ -245,9 +244,8 @@
(do
(.setUncaughtExceptionHandler stream
(reify StreamsUncaughtExceptionHandler
- (^StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse handle [_ ^Throwable error]
- (handle-uncaught-exception (get stream-config :stream-thread-exception-response :shutdown-client) error))))
- (.start stream)
+ (^StreamsUncaughtExceptionHandler$StreamThreadExceptionResponse handle [_ ^Throwable error] (handle-uncaught-exception (get stream-config :stream-thread-exception-response :shutdown-client) error))))
+ (clog/with-logging-context {:consumer-group topic-entity} (.start stream))
(assoc streams topic-entity stream))
streams)))
{}
diff --git a/src/ziggurat/util/logging.clj b/src/ziggurat/util/logging.clj
index c0e2480b..38e68a3a 100644
--- a/src/ziggurat/util/logging.clj
+++ b/src/ziggurat/util/logging.clj
@@ -1,11 +1,7 @@
(ns ziggurat.util.logging
- (:require [clojure.tools.logging :as log])
- (:import (org.apache.logging.log4j ThreadContext)))
+ (:require [cambium.core :as clog]))
(defmacro with-context
[context-map & body]
- `(try
- (doseq [[k# v#] ~context-map] (ThreadContext/put (name k#) (str v#)))
- ~@body
- (finally (doseq [[k# _#] ~context-map] (ThreadContext/remove (name k#))))))
+ `(clog/with-logging-context ~context-map ~@body))
diff --git a/test/ziggurat/init_test.clj b/test/ziggurat/init_test.clj
index 0f1135c5..8987cb11 100644
--- a/test/ziggurat/init_test.clj
+++ b/test/ziggurat/init_test.clj
@@ -9,7 +9,12 @@
[ziggurat.streams :as streams :refer [stream]]
[ziggurat.server.test-utils :as tu]
[ziggurat.tracer :as tracer]
- [ziggurat.fixtures :refer [with-config]])
+ [ziggurat.fixtures :refer [with-config]]
+ [cambium.logback.json.flat-layout :as flat]
+ [cambium.codec :as codec]
+ [cambium.core :as clog]
+ [clojure.tools.logging :as log])
+
(:import (io.opentracing.mock MockTracer)))
(def valid-modes-count 4)
@@ -105,7 +110,24 @@
(swap! start-was-called not)
(is (= expected-stream-routes stream-router)))]
(init/main #() #() expected-stream-routes)
- (is @start-was-called)))))
+ (is @start-was-called))))
+ (testing "Flat Json Layout decoder is set if log format is json"
+ (let [start-was-called (atom false)
+ decoder-was-set (atom false)
+ expected-stream-routes {:default {:handler-fn #(constantly nil)}}
+ config config/default-config]
+ (with-redefs [init/add-shutdown-hook (fn [_ _] (constantly nil))
+ config/config-file "config.test.edn"
+ config/ziggurat-config (fn [] (assoc config :log-format "json"))
+ init/validate-routes-against-config (constantly nil)
+ init/start (fn [_ stream-router _ _ _]
+ (swap! start-was-called not)
+ (is (= expected-stream-routes stream-router)))
+ flat/set-decoder! (fn [decoder] (is (= decoder codec/destringify-val))
+ (reset! decoder-was-set true))]
+ (init/main #() #() expected-stream-routes)
+ (is @start-was-called)
+ (is @decoder-was-set)))))
(def mock-modes {:api-server {:start-fn (constantly nil) :stop-fn (constantly nil)}
:stream-worker {:start-fn (constantly nil) :stop-fn (constantly nil)}
diff --git a/test/ziggurat/util/logging_test.clj b/test/ziggurat/util/logging_test.clj
index 83e07877..e7ea66c0 100644
--- a/test/ziggurat/util/logging_test.clj
+++ b/test/ziggurat/util/logging_test.clj
@@ -1,20 +1,20 @@
(ns ziggurat.util.logging-test
(:require [clojure.test :refer :all]
[ziggurat.util.logging :as zlog])
- (:import (org.apache.logging.log4j ThreadContext)))
+ (:import (org.slf4j MDC)))
(deftest with-context-test
- (let [capture-context #(reset! % (into {} (ThreadContext/getImmutableContext)))]
- (testing "sets thread local log context params within body"
+ (let [capture-context #(reset! % (into {} (MDC/getCopyOfContextMap)))]
+ (testing "sets mdc context params within body"
(let [context-map (atom {})]
- (zlog/with-context {:param-1 "string-value", :param-2 123}
+ (zlog/with-context {:param-1 "string-value" :param-2 123}
(capture-context context-map))
- (is (= "string-value" (get @context-map "param-1")))
+ (is (= "\"string-value\"" (get @context-map "param-1")))
(is (= "123" (get @context-map "param-2")))))
- (testing "clears thread local log context params when exits"
+ (testing "clears mdc context params when exits"
(let [context-map (atom {})]
- (zlog/with-context {:param-1 "string-value", :param-2 123}
+ (zlog/with-context {:param-1 "string-value" :param-2 123}
(constantly nil))
(capture-context context-map)
(is (empty? @context-map))))))