From 6e25d0c5856f2f07d567949a59545e9719e79c07 Mon Sep 17 00:00:00 2001 From: Kimura Sotaro Date: Sun, 18 Aug 2013 22:50:13 +0900 Subject: [PATCH] Add Funcion: Save storm version to ZooKeeper --- storm-core/src/clj/backtype/storm/cluster.clj | 9 +++++++++ storm-core/src/clj/backtype/storm/config.clj | 9 +++++++++ storm-core/src/clj/backtype/storm/daemon/nimbus.clj | 1 + storm-core/src/clj/backtype/storm/ui/core.clj | 9 --------- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj index 7231b15d6..f96d03d3a 100644 --- a/storm-core/src/clj/backtype/storm/cluster.clj +++ b/storm-core/src/clj/backtype/storm/cluster.clj @@ -114,6 +114,7 @@ (remove-worker-heartbeat! [this storm-id node port]) (supervisor-heartbeat! [this supervisor-id info]) (activate-storm! [this storm-id storm-base]) + (initialize-version! [this]) (update-storm! [this storm-id new-elems]) (remove-storm-base! [this storm-id]) (set-assignment! [this storm-id info]) @@ -131,6 +132,7 @@ (def SUPERVISORS-ROOT "supervisors") (def WORKERBEATS-ROOT "workerbeats") (def ERRORS-ROOT "errors") +(def VERSION-PATH "version") (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) (def STORMS-SUBTREE (str "/" STORMS-ROOT)) @@ -159,6 +161,9 @@ (defn error-path [storm-id component-id] (str (error-storm-root storm-id) "/" (url-encode component-id))) +(defn version-path [] + (str "/" VERSION-PATH)) + (defn- issue-callback! [cb-atom] (let [cb @cb-atom] (reset! cb-atom nil) @@ -304,6 +309,10 @@ (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base)) ) + (initialize-version! [this] + (set-data cluster-state (version-path) (Utils/serialize (read-storm-version))) + ) + (update-storm! [this storm-id new-elems] (let [base (storm-base this storm-id nil) executors (:component->executors base) diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj index c11074eb1..e0f20288e 100644 --- a/storm-core/src/clj/backtype/storm/config.clj +++ b/storm-core/src/clj/backtype/storm/config.clj @@ -5,6 +5,7 @@ (:import [org.apache.commons.io FileUtils]) (:require [clojure [string :as str]]) (:use [backtype.storm util]) + (:use [clojure.string :only [trim]]) ) (def RESOURCES-SUBDIR "resources") @@ -183,6 +184,14 @@ (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path))) )) +(defn read-storm-version [] + (let [storm-home (System/getProperty "storm.home") + release-path (format "%s/RELEASE" storm-home) + release-file (File. release-path)] + (if (and (.exists release-file) (.isFile release-file)) + (trim (slurp release-path)) + "Unknown"))) + (defn worker-root ([conf] (str (conf STORM-LOCAL-DIR) "/workers")) diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj index e126a26c7..e749bc8ec 100644 --- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj @@ -878,6 +878,7 @@ (log-message "Starting Nimbus with conf " conf) (let [nimbus (nimbus-data conf inimbus)] (.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf) + (.initialize-version! (:storm-cluster-state nimbus)) (cleanup-corrupt-topologies! nimbus) (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))] (transition! nimbus storm-id :startup)) diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index d8a9d9897..904ef85fc 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -6,7 +6,6 @@ (:use [backtype.storm.ui helpers]) (:use [backtype.storm.daemon [common :only [ACKER-COMPONENT-ID system-id?]]]) (:use [ring.adapter.jetty :only [run-jetty]]) - (:use [clojure.string :only [trim]]) (:import [backtype.storm.generated ExecutorSpecificStats ExecutorStats ExecutorSummary TopologyInfo SpoutStats BoltStats ErrorInfo ClusterSummary SupervisorSummary TopologySummary @@ -54,14 +53,6 @@ (seq body) ])) -(defn read-storm-version [] - (let [storm-home (System/getProperty "storm.home") - release-path (format "%s/RELEASE" storm-home) - release-file (File. release-path)] - (if (and (.exists release-file) (.isFile release-file)) - (trim (slurp release-path)) - "Unknown"))) - (defn cluster-summary-table [^ClusterSummary summ] (let [sups (.get_supervisors summ) used-slots (reduce + (map #(.get_num_used_workers ^SupervisorSummary %) sups))