Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Socket hb #750

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions storm-core/src/clj/backtype/storm/cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
(update-storm! [this storm-id new-elems])
(remove-storm-base! [this storm-id])
(set-assignment! [this storm-id info])
(remove-assignment! [this storm-id])
(remove-storm! [this storm-id])
(report-error [this storm-id task-id error])
(errors [this storm-id task-id])
Expand Down Expand Up @@ -327,6 +328,9 @@
(set-data cluster-state (assignment-path storm-id) (Utils/serialize info))
)

(remove-assignment! [this storm-id]
(delete-node cluster-state (assignment-path storm-id)))

(remove-storm! [this storm-id]
(delete-node cluster-state (assignment-path storm-id))
(remove-storm-base! this storm-id))
Expand Down
55 changes: 42 additions & 13 deletions storm-core/src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@
;; tracked through heartbeat-cache
(defn- update-executor-cache [curr hb]
(let [reported-time (:time-secs hb)
uptime (:uptime hb)
{last-nimbus-time :nimbus-time
last-reported-time :executor-reported-time} curr
reported-time (cond reported-time reported-time
Expand All @@ -339,7 +340,8 @@
last-nimbus-time
)]
{:nimbus-time nimbus-time
:executor-reported-time reported-time}))
:executor-reported-time reported-time
:uptime uptime}))

(defn update-heartbeat-cache [cache executor-beats all-executors]
(let [cache (select-keys cache all-executors)]
Expand Down Expand Up @@ -381,20 +383,25 @@
(filter (fn [executor]
(let [start-time (get executor-start-times executor)
nimbus-time (-> heartbeats-cache (get executor) :nimbus-time)]
(log-debug "Exetutor " storm-id ":" executor " start-time: " start-time " nimbus-time: " nimbus-time)
(if (and start-time
(or
(< (time-delta start-time)
(conf NIMBUS-TASK-LAUNCH-SECS))
(not nimbus-time)
(< (time-delta nimbus-time)
(conf NIMBUS-TASK-TIMEOUT-SECS))
))
(if-not nimbus-time
(do
(log-debug "nimbus-time is nil, check nimbus if start time longer than NIMBUS-RECOIVER-HEARTBEART-SECS")
(< ((:uptime nimbus)) (conf NIMBUS-RECOVER-HEARTBEART-SECS)))
(do
(log-debug "nimbus-time is " nimbus-time " check executor heartbeat time")
(or
(< (time-delta start-time)
(conf NIMBUS-TASK-LAUNCH-SECS))
(< (time-delta nimbus-time)
(conf NIMBUS-TASK-TIMEOUT-SECS))))))
true
(do
(log-message "Executor " storm-id ":" executor " not alive")
(log-message "Executor " storm-id ":" executor " not alive, start-time is " (if start-time start-time "nil") " nimbus-time " (if nimbus-time nimbus-time "nil"))
false))
)))
doall)))
doall)))


(defn- to-executor-id [task-ids]
Expand Down Expand Up @@ -537,7 +544,7 @@
storm-cluster-state (:storm-cluster-state nimbus)
topology->executors (compute-topology->executors nimbus (keys existing-assignments))
;; update the executors heartbeats first.
_ (update-all-heartbeats! nimbus existing-assignments topology->executors)
;;_ (update-all-heartbeats! nimbus existing-assignments topology->executors)
topology->alive-executors (compute-topology->alive-executors nimbus
existing-assignments
topologies
Expand Down Expand Up @@ -642,7 +649,8 @@
;; for the topology which wants rebalance (specified by the scratch-topology-id)
;; we exclude its assignment, meaning that all the slots occupied by its assignment
;; will be treated as free slot in the scheduler code.
(when (or (nil? scratch-topology-id) (not= tid scratch-topology-id))
(if (and (not-nil? scratch-topology-id) (= tid scratch-topology-id))
(.remove-assignment! storm-cluster-state tid)
{tid (.assignment-info storm-cluster-state tid nil)})))
;; make the new assignments for topologies
topology->executor->node+port (compute-new-topology->executor->node+port
Expand Down Expand Up @@ -974,6 +982,27 @@
(transition-name! nimbus storm-name [:rebalance wait-amt num-workers executor-overrides] true)
))

(workerHeartBeat [this storm-id work-id port executors uptime hbtime stats]
(let [hb { :storm-id storm-id
:time-secs hbtime
:uptime uptime
}
node-port (str work-id "-" port)
byte-len (.remaining stats)
data (byte-array byte-len)
_ (.get stats data)
executor-stats (Utils/deserialize data)
cache (@(:heartbeats-cache nimbus) storm-id)
cache (if cache cache {})
newcache (into {}
(for [executor executors :let [curr (cache executor)]]
[executor (merge (update-executor-cache curr hb) {:stats (get executor-stats executor)})]))
]
(log-debug "worker heartbeat storm-id: " storm-id " worker-id: " work-id " port: " port "executors: " executors " uptime: " uptime " hbtime: " hbtime)

(swap! (:heartbeats-cache nimbus) assoc storm-id (merge cache newcache))
))

(activate [this storm-name]
(transition-name! nimbus storm-name :activate true)
)
Expand Down Expand Up @@ -1087,7 +1116,7 @@
task->component (storm-task-info (try-read-storm-topology conf storm-id) (try-read-storm-conf conf storm-id))
base (.storm-base storm-cluster-state storm-id nil)
assignment (.assignment-info storm-cluster-state storm-id nil)
beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))
beats (@(:heartbeats-cache nimbus) storm-id)
all-components (-> task->component reverse-map keys)
errors (->> all-components
(map (fn [c] [c (get-errors storm-cluster-state storm-id c)]))
Expand Down
21 changes: 15 additions & 6 deletions storm-core/src/clj/backtype/storm/daemon/worker.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
(:import [java.util.concurrent Executors])
(:import [backtype.storm.messaging TransportFactory])
(:import [backtype.storm.messaging IContext IConnection])
(:import [java.nio ByteBuffer])
(:import [backtype.storm.utils WorkerHbProxy])
(:gen-class))

(bootstrap)
Expand All @@ -29,13 +31,19 @@
(->> executors
(map (fn [e] {(executor/get-executor-id e) (executor/render-stats e)}))
(apply merge)))
zk-hb {:storm-id (:storm-id worker)
:executor-stats stats
:uptime ((:uptime worker))
:time-secs (current-time-secs)
}]
storm-id (:storm-id worker)
hb-proxy (:heartbeat-proxy worker)
assignment-id (:assignment-id worker)
port (:port worker)
uptime ((:uptime worker))
time-secs (current-time-secs)
executors (:executors worker)
stats-ser (Utils/serialize stats)]
;;do socket heartbeat
(log-message "stats key type:" (map type executors))
(.workerHeartBeat hb-proxy storm-id assignment-id port executors uptime time-secs (ByteBuffer/wrap stats-ser))
;; do the zookeeper heartbeat
(.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb)
;(.worker-heartbeat! (:storm-cluster-state worker) (:storm-id worker) (:assignment-id worker) (:port worker) zk-hb)
))

(defn do-heartbeat [worker]
Expand Down Expand Up @@ -204,6 +212,7 @@
:user-shared-resources (mk-user-resources <>)
:transfer-local-fn (mk-transfer-local-fn <>)
:transfer-fn (mk-transfer-fn <>)
:heartbeat-proxy (WorkerHbProxy. (conf NIMBUS-HOST) (conf NIMBUS-THRIFT-PORT) (HashMap. conf))
)))

(defn- endpoint->string [[node port]]
Expand Down
5 changes: 5 additions & 0 deletions storm-core/src/jvm/backtype/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ public class Config extends HashMap<String, Object> {
public static final String NIMBUS_TASK_TIMEOUT_SECS = "nimbus.task.timeout.secs";
public static final Object NIMBUS_TASK_TIMEOUT_SECS_SCHEMA = Number.class;

/**
* How long nimbus start wait for executor heartbeat connect
*/
public static String NIMBUS_RECOVER_HEARTBEART_SECS = "nimbus.recover.heartbeat.secs";
public static final Object NIMBUS_RECOVER_HEARTBEART_SECS_SCHEMA = Number.class;

/**
* How often nimbus should wake up to check heartbeats and do reassignments. Note
Expand Down
8 changes: 5 additions & 3 deletions storm-core/src/jvm/backtype/storm/Constants.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package backtype.storm;

import backtype.storm.coordination.CoordinatedBolt;
import clojure.lang.RT;

import java.util.ArrayList;
import java.util.Arrays;


public class Constants {
public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";
public static final String COORDINATED_STREAM_ID = CoordinatedBolt.class.getName() + "/coord-stream";

public static final long SYSTEM_TASK_ID = -1;
public static final Object SYSTEM_EXECUTOR_ID = RT.readString("[-1 -1]");
public static final ArrayList<Integer> SYSTEM_EXECUTOR_ID = new ArrayList<Integer>(Arrays.asList(-1,-1));
public static final String SYSTEM_COMPONENT_ID = "__system";
public static final String SYSTEM_TICK_STREAM_ID = "__tick";
public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
Expand Down
Loading