Skip to content

Commit

Permalink
Merge pull request #1 from weka/node_ids
Browse files Browse the repository at this point in the history
Node ids
  • Loading branch information
vince-weka authored Dec 24, 2020
2 parents 83ce40f + 8629693 commit 6437c0a
Show file tree
Hide file tree
Showing 5 changed files with 1,359 additions and 1,052 deletions.
18 changes: 17 additions & 1 deletion etc_prometheus/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ alerting:
global:
evaluation_interval: 15s
scrape_interval: 60s
scrape_timeout: 50s
scrape_timeout: 30s
rule_files:
-
rules.yml
Expand All @@ -35,3 +35,19 @@ scrape_configs:
-
targets:
- "grafana:3000"
-
job_name: loki
scrape_interval: 15s
scrape_timeout: 5s
static_configs:
-
targets:
- "loki:3100"
-
job_name: alertmanager
scrape_interval: 15s
scrape_timeout: 5s
static_configs:
-
targets:
- "loki:9093"
106 changes: 95 additions & 11 deletions export/collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,22 +250,53 @@ def collect( self ):
yield GaugeMetricFamily('weka_collect_seconds', 'Total Time spent in Prometheus collect', value=elapsed)
#weka_collect_gauge = GaugeMetricFamily('weka_collect_seconds', 'Total Time spent in Prometheus collect')
#weka_collect_gauge.add_metric(labels={}, value=elapsed)
log.info(f"status returned. total time = {elapsed}")
log.info(f"stats returned. total time = {elapsed}")

# runs in a thread, so args comes in as a dict
def call_api( self, cluster, metric, category, args ):
method = args['method']
parms = args['parms']
log.debug(f"method={method}, parms={parms}")
#log.debug(f"method={method}, parms={parms}")

#log.error(f"calling {cluster.name} API with {method} {parms}")
data_returned = cluster.call_api( method=method, parms=parms )

if category != None and not category in self.clusterdata[str(cluster)]:
self.clusterdata[str(cluster)][category] = {}

if len(data_returned) == 0:
return

if category == None:
self.clusterdata[str(cluster)][metric] = cluster.call_api( method=method, parms=parms )
#log.error(f"{type(data_returned)} {data_returned}")
if metric not in self.clusterdata[str(cluster)]:
if type(data_returned) == list:
self.clusterdata[str(cluster)][metric] = []
else:
self.clusterdata[str(cluster)][metric] = {}

if type(data_returned) == list:
self.clusterdata[str(cluster)][metric] += data_returned
else:
self.clusterdata[str(cluster)][metric].update(data_returned)

else:
#print( json.dumps( self.clusterdata, indent=4, sort_keys=True ))
#log.debug( self.clusterdata[str(cluster)].keys() )
if not category in self.clusterdata[str(cluster)]:
self.clusterdata[str(cluster)][category] = {}
self.clusterdata[str(cluster)][category][metric] = cluster.call_api( method=method, parms=parms )

#if not category in self.clusterdata[str(cluster)]:
# self.clusterdata[str(cluster)][category] = {}
if metric not in self.clusterdata[str(cluster)][category]:
if type(data_returned) == list:
self.clusterdata[str(cluster)][category][metric] = []
else:
self.clusterdata[str(cluster)][category][metric] = {}

if type(data_returned) == list:
self.clusterdata[str(cluster)][category][metric] += data_returned
else:
self.clusterdata[str(cluster)][category][metric].update(data_returned)



# start here
#
Expand All @@ -279,6 +310,7 @@ def gather( self, cluster ):
start_time = time.time()
log.info( "gathering weka data from cluster {}".format(str(cluster)) )


# re-initialize wekadata so changes in the cluster don't leave behind strange things (hosts/nodes that no longer exist, etc)
wekadata={}
self.clusterdata[str(cluster)] = wekadata # clear out old data
Expand Down Expand Up @@ -313,6 +345,8 @@ def gather( self, cluster ):
# clear old maps, if any - if nodes come/go this can get funky with old data, so re-create it every time
weka_maps = { "node-host": {}, "node-role": {}, "host-role": {} } # initial state of maps

#backend_nodes = []
#client_nodes = []
# populate maps
try:
for node in wekadata["nodeList"]:
Expand All @@ -330,17 +364,67 @@ def gather( self, cluster ):
log.error( "error building maps. Aborting data gather from cluster {}".format(str(cluster)) )
return

#log.error(f"backend nodes: {backend_nodes}")
#log.error(f"client nodes: {client_nodes}")

log.info(f"Cluster {cluster} Using {cluster.sizeof()} hosts")
thread_runner = simul_threads(cluster.sizeof()) # up the server count - so 1 thread per server in the cluster
#thread_runner = simul_threads(50) # testing

# be simplistic at first... let's just gather on a subset of nodes each query
#all_nodes = backend_nodes + client_nodes # concat both lists

node_maps = { "FRONTEND": [], "COMPUTE": [], "DRIVES": [], "MANAGEMENT": [] } # initial state of maps

#log.error(f'{weka_maps["node-role"]}')
server_FEs = [] # server FRONTEND nodes (for ops_nfs stats)

for node in weka_maps["node-role"]: # node == "NodeId<xx>"
for role in weka_maps['node-role'][node]:
nid = int(node[7:-1]) # make nodeid numeric
node_maps[role].append(nid)
if role == "FRONTEND" and weka_maps["host-role"][weka_maps["node-host"][node]] == "server":
server_FEs.append(nid)

#log.error(f"{cluster.name} {node_maps}")

# find a better place to define this... for now here is good (vince)
category_nodetypes = {
'cpu': ['FRONTEND','COMPUTE','DRIVES'],
'ops': ['FRONTEND'],
'ops_driver': ['FRONTEND'],
'ops_nfs': ['FRONTEND'], # not sure about this one
'ssd': ['DRIVES']
}

# schedule a bunch of data gather queries
for category, stat_dict in self.get_commandlist().items():

category_nodes = []
#log.error(f"{cluster.name} category is: {category} {category_nodetypes[category]}")
if category == "ops_nfs": # NFS stats only show on server FEs; no point in looking at anything else
category_nodes = server_FEs
else:
for nodetype in category_nodetypes[category]: # nodetype is FRONTEND, COMPUTE, DRIVES, MANAGEMENT
category_nodes += node_maps[nodetype]

#log.error(f"{cluster.name} cat nodes: {category} {category_nodes}")

query_nodes = list( set( category_nodes.copy() ) ) # make the list unique so we don't ask for the same data muliple times

for stat, command in stat_dict.items():
try:
thread_runner.new( self.call_api, (cluster, stat, category, command ) )
except:
log.error( "gather(): error scheduling thread wekastat for cluster {}".format(str(cluster)) )
step = 100
for i in range(0, len(query_nodes), step):
import copy
newcmd = copy.deepcopy(command) # make sure to copy it
newcmd["parms"]["node_ids"] = copy.deepcopy(query_nodes[i:i+step]) # make sure to copy it
#log.error(f"{i}: {i+step}, {cluster.name} {query_nodes[i:i+step]}" ) # debugging
#log.error(f"scheduling {cluster.name} {newcmd['parms']}" )
try:
thread_runner.new( self.call_api, (cluster, stat, category, newcmd ) )
#thread_runner.new( self.call_api, (cluster, stat, category, command ) )
except:
log.error( "gather(): error scheduling thread wekastat for cluster {}".format(str(cluster)) )

thread_runner.run() # schedule the rest of the threads, wait for them
del thread_runner
Expand Down
7 changes: 3 additions & 4 deletions export/wekacluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def call_api( self, method=None, parms={} ):
host = self.hosts.reserve()
api_return = None
while host != None:
log.debug( "calling Weka API on cluster {}, host {}".format(self.name,host) )
log.debug( "calling Weka API on cluster {self.name}, host {host} {method} {parms}")
try:
api_return = host.call_api( method, parms )
except Exception as exc:
Expand All @@ -141,9 +141,8 @@ def call_api( self, method=None, parms={} ):
self.hosts.remove(host) # remove it from the hostlist iterable
host = self.hosts.reserve() # try another host; will return None if none left or failure
self.errors += 1
log.error( "cluster={}, error {} spawning command {} on host {}. Retrying on {}.".format(
self.name, exc, str(method), last_hostname, str(host)) )
print(traceback.format_exc())
#log.error(f"cluster={self.name}, error {exc} spawning command {str(method)}{parms} on host {last_hostname}. Retrying on {str(host)}.")
#print(traceback.format_exc())
continue

self.hosts.release(host) # release it so it can be used for more queries
Expand Down
Loading

0 comments on commit 6437c0a

Please sign in to comment.