Skip to content

Commit

Permalink
change the order of the components when being stopped
Browse files Browse the repository at this point in the history
  • Loading branch information
macalimlim committed Sep 27, 2021
1 parent e537080 commit dd28834
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
25 changes: 14 additions & 11 deletions src/ziggurat/init.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
(:require [cambium.codec :as codec]
[cambium.logback.json.flat-layout :as flat]
[clojure.set :as set]
[clojure.string :as str]
[clojure.tools.logging :as log]
[mount.core :as mount :refer [defstate]]
[mount.core :as mount]
[schema.core :as s]
[ziggurat.config :refer [ziggurat-config] :as config]
[ziggurat.kafka-consumer.consumer-driver :as consumer-driver]
[ziggurat.kafka-consumer.executor-service :as executor-service]
[ziggurat.messaging.connection :as messaging-connection :refer [connection]]
[ziggurat.messaging.consumer :as messaging-consumer]
[ziggurat.messaging.producer :as messaging-producer]
Expand All @@ -17,9 +20,7 @@
[ziggurat.server :as server]
[ziggurat.streams :as streams]
[ziggurat.tracer :as tracer]
[ziggurat.util.java-util :as util]
[ziggurat.kafka-consumer.executor-service :as executor-service]
[ziggurat.kafka-consumer.consumer-driver :as consumer-driver])
[ziggurat.util.java-util :as util])
(:gen-class
:methods [^{:static true} [init [java.util.Map] void]]
:name tech.gojek.ziggurat.internal.Init))
Expand Down Expand Up @@ -47,7 +48,7 @@
(messaging-producer/make-queues (event-routes args)))

(defn- set-properties-for-structured-logging []
(if (= (:log-format (ziggurat-config)) "json")
(when (= (:log-format (ziggurat-config)) "json")
(flat/set-decoder! codec/destringify-val)))

(defn start-kafka-producers []
Expand Down Expand Up @@ -166,9 +167,9 @@
(defn stop
"Calls the Ziggurat's state stop fns and then actor-stop-fn."
[actor-stop-fn modes]
(execute-function modes :stop-fn)
(actor-stop-fn)
(stop-common-states)
(execute-function modes :stop-fn))
(stop-common-states))

(defn- add-shutdown-hook [actor-stop-fn modes]
(.addShutdownHook
Expand All @@ -186,6 +187,8 @@
{s/Keyword {:handler-fn (s/pred #(fn? %))
s/Keyword (s/pred #(fn? %))}}))

(declare BatchRoute)

(s/defschema BatchRoute
(s/conditional
#(and (seq %)
Expand All @@ -206,7 +209,7 @@
(throw (IllegalArgumentException. (format "Error! Route %s isn't present in the %s config" topic-entity route-type)))
(when-not (set/subset? channels config-channels)
(let [diff (set/difference channels config-channels)]
(throw (IllegalArgumentException. (format "Error! The channel(s) %s aren't present in the channels config of %s " (clojure.string/join "," diff) route-type))))))))))
(throw (IllegalArgumentException. (format "Error! The channel(s) %s aren't present in the channels config of %s " (str/join "," diff) route-type))))))))))

(defn validate-routes [stream-routes batch-routes modes]
(when (contains? (set modes) :stream-worker)
Expand All @@ -218,12 +221,12 @@

(defn- derive-modes [stream-routes batch-routes actor-routes]
(let [base-modes [:management-api :worker]]
(if (and (nil? stream-routes) (nil? batch-routes))
(when (and (nil? stream-routes) (nil? batch-routes))
(throw (IllegalArgumentException. "Either :stream-routes or :batch-routes should be present in init args")))
(cond-> base-modes
(some? stream-routes) (conj :stream-worker)
(some? batch-routes) (conj :batch-worker)
(some? actor-routes) (conj :api-server))))
(some? batch-routes) (conj :batch-worker)
(some? actor-routes) (conj :api-server))))

(defn validate-modes [modes stream-routes batch-routes actor-routes]
(let [derived-modes (if-not (empty? modes)
Expand Down
33 changes: 20 additions & 13 deletions test/ziggurat/init_test.clj
Original file line number Diff line number Diff line change
@@ -1,20 +1,17 @@
(ns ziggurat.init-test
(:require [clojure.test :refer :all]
[mount.core :refer [defstate] :as mount]
(:require [clojure.test :refer [deftest is testing]]
[mount.core :as mount]
[ziggurat.config :as config]
[ziggurat.init :as init]
[ziggurat.messaging.connection :as rmqc]
[ziggurat.messaging.consumer :as messaging-consumer]
[ziggurat.messaging.producer :as messaging-producer]
[ziggurat.streams :as streams :refer [stream]]
[ziggurat.streams :as streams]
[ziggurat.server.test-utils :as tu]
[ziggurat.tracer :as tracer]
[ziggurat.fixtures :refer [with-config]]
[cambium.logback.json.flat-layout :as flat]
[cambium.codec :as codec]
[cambium.core :as clog]
[clojure.tools.logging :as log])

[cambium.codec :as codec])
(:import (io.opentracing.mock MockTracer)))

(def valid-modes-count 4)
Expand All @@ -25,8 +22,7 @@

(deftest start-calls-actor-start-fn-test
(testing "The actor start fn starts before the ziggurat state and can read config"
(let [result (atom 1)
start-messaging-internal-call-count 2]
(let [result (atom 1)]
(with-redefs [streams/start-streams (fn [_ _] (reset! result (* @result 2)))
streams/stop-streams (constantly nil)
;; will be called valid modes number of times
Expand All @@ -47,20 +43,19 @@
(with-config
(do (init/start #() {} {} [] nil)
(init/stop #(reset! result (+ @result 3)) nil)
(is (= 8 @result))))))))
(is (= 5 @result))))))))

(deftest stop-calls-idempotentcy-test
(testing "The stop function should be idempotent"
(let [result (atom 1)
stop-connection-internal-call-count 1]
(let [result (atom 1)]
(with-redefs [streams/start-streams (constantly nil)
streams/stop-streams (constantly nil)
rmqc/stop-connection (fn [_] (reset! result (* @result 2)))
tracer/create-tracer (fn [] (MockTracer.))]
(with-config
(do (init/start #() {} {} [] nil)
(init/stop #(reset! result (+ @result 3)) nil)
(is (= 8 @result))))))))
(is (= 5 @result))))))))

(deftest start-calls-make-queues-with-both-streams-and-batch-routes-test
(testing "Start calls make queues with both streams and batch routes"
Expand Down Expand Up @@ -327,3 +322,15 @@
:channel-2 #()}}]
(is (thrown? IllegalArgumentException (init/validate-routes stream-routes batch-routes modes)))))))))

(deftest stop-test
(testing "the following components execute-function -> actor-stop-fn -> stop-common-states should be stopped"
(let [is-execute-function-called? (atom false)
is-actor-stop-fn-called? (atom false)
is-stop-common-states-called? (atom false)
actor-stop-fn (fn [] (reset! is-actor-stop-fn-called? true))]
(with-redefs [init/execute-function (fn [_ _] (reset! is-execute-function-called? true))
init/stop-common-states (fn [] (reset! is-stop-common-states-called? true))]
(init/stop actor-stop-fn {})
(is (true? @is-execute-function-called?))
(is (true? @is-actor-stop-fn-called?))
(is (true? @is-stop-common-states-called?))))))

0 comments on commit dd28834

Please sign in to comment.