Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul and update InfluxDB engine #98

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions src/pyrocore/daemon/webapp.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def json_engine(self, req): # pylint: disable=R0201,W0613
""" Return torrent engine data.
"""
try:
return stats.engine_data(config.engine)
return wrap_engine_data(config.engine)
except (error.LoggableError, xmlrpc.ERRORS) as torrent_exc:
raise exc.HTTPInternalServerError(str(torrent_exc))

Expand Down Expand Up @@ -252,6 +252,23 @@ def make_app(httpd_config):
.add_route("/json/{action}", controller=JsonController(**httpd_config.json))
)

def wrap_engine_data(engine):
result = stats.engine_data(engine)

# Build result object
return dict(
now = time.time(),
engine_id = engine.engine_id,
versions = engine.versions,
uptime = engine.uptime,
upload = [result['throttle.global_up.rate'], result['throttle.global_up.max_rate']],
download = [result['throttle.global_down.rate'], result['throttle.global_down.max_rate']],
views = dict([(name, values['size'])
for name, values in result['views'].items()
]),
)



def module_test():
""" Quick test using…
Expand All @@ -264,7 +281,7 @@ def module_test():
try:
engine = connect()
print("%s - %s" % (engine.engine_id, engine.open()))
pprint.pprint(stats.engine_data(engine))
pprint.pprint(wrap_engine_data(engine))
print("%s - %s" % (engine.engine_id, engine.open()))
except (error.LoggableError, xmlrpc.ERRORS) as torrent_exc:
print("ERROR: %s" % torrent_exc)
Expand Down
10 changes: 2 additions & 8 deletions src/pyrocore/data/config/torque.ini
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,8 @@ job.fluxstats.schedule = second=*/15
job.fluxstats.active = False
;job.fluxstats.log_level = DEBUG

; Database name
job.fluxstats.dbname = torque
; Series name for rTorrent data (set empty to disable)
job.fluxstats.series = rtorrent
; Series name for host data (set empty to disable)
job.fluxstats.series_host = host
;job.fluxstats.
;job.fluxstats.
; Series prefix
job.fluxstats.series_prefix = rtorrent_

# Tree watch
job.treewatch.handler = pyrocore.torrent.watch:TreeWatch
Expand Down
106 changes: 39 additions & 67 deletions src/pyrocore/torrent/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,41 +18,15 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

try:
import json
except ImportError:
import simplejson as json # pylint: disable=F0401

import requests
from requests.exceptions import RequestException
from requests.exceptions import RequestException, HTTPError

from pyrobase.parts import Bunch
from pyrocore import error
from pyrocore import config as config_ini
from pyrocore.util import fmt, xmlrpc, pymagic, stats


def _flux_engine_data(engine):
""" Return rTorrent data set for pushing to InfluxDB.
"""
data = stats.engine_data(engine)

# Make it flat
data["up_rate"] = data["upload"][0]
data["up_limit"] = data["upload"][1]
data["down_rate"] = data["download"][0]
data["down_limit"] = data["download"][1]
data["version"] = data["versions"][0]
views = data["views"]

del data["upload"]
del data["download"]
del data["versions"]
del data["views"]

return data, views


class EngineStats(object):
""" rTorrent connection statistics logger.
"""
Expand All @@ -75,7 +49,7 @@ def run(self):
fmt.human_duration(proxy.system.time() - config_ini.engine.startup, 0, 2, True).strip(),
proxy
))
except (error.LoggableError, xmlrpc.ERRORS), exc:
except (error.LoggableError, xmlrpc.ERRORS) as exc:
self.LOG.warn(str(exc))


Expand All @@ -99,63 +73,61 @@ def __init__(self, config=None):
def _influxdb_url(self):
""" Return REST API URL to access time series.
"""
url = "{0}/db/{1}/series".format(self.influxdb.url.rstrip('/'), self.config.dbname)
url = "{0}/write?db={1}".format(self.influxdb.url.rstrip('/'), self.config.dbname)

if self.influxdb.user and self.influxdb.password:
url += "?u={0}&p={1}".format(self.influxdb.user, self.influxdb.password)
url += "&u={0}&p={1}".format(self.influxdb.user, self.influxdb.password)

return url

def _influxdb_data(self):
""" Return statitics data formatted according to InfluxDB's line protocol
"""
datastr = ''

try:
proxy = config_ini.engine.open()
hostname = proxy.system.hostname()
pid = proxy.system.pid()
data = stats.engine_data(config_ini.engine)
views = data['views']
del data['views']
datastr = u"{0}stat,hostname={1},pid={2} ".format(
self.config.series_prefix, hostname, pid)
datastr += ','.join(['='.join([k, str(v)]) for k, v in data.items()]) + '\n'
for view_name, values in views.items():
vstr = u"{0}view,hostname={1},pid={2},name={3} ".format(
self.config.series_prefix, hostname, pid, view_name)
vstr += ','.join(['='.join([k, str(v)]) for k, v in values.items()])
datastr += vstr + "\n"
except (error.LoggableError, xmlrpc.ERRORS) as exc:
self.LOG.warn("InfluxDB stats: {0}".format(exc))
return datastr

def _push_data(self):
""" Push stats data to InfluxDB.
"""
if not (self.config.series or self.config.series_host):
self.LOG.info("Misconfigured InfluxDB job, neither 'series' nor 'series_host' is set!")
return

# Assemble data
fluxdata = []

if self.config.series:
try:
config_ini.engine.open()
data, views = _flux_engine_data(config_ini.engine)
fluxdata.append(dict(
name=self.config.series,
columns=data.keys(),
points=[data.values()]
))
fluxdata.append(dict(
name=self.config.series + '_views',
columns=views.keys(),
points=[views.values()]
))
except (error.LoggableError, xmlrpc.ERRORS), exc:
self.LOG.warn("InfluxDB stats: {0}".format(exc))

# if self.config.series_host:
# fluxdata.append(dict(
# name = self.config.series_host,
# columns = .keys(),
# points = [.values()]
# ))

if not fluxdata:
datastr = self._influxdb_data()

if not datastr:
self.LOG.debug("InfluxDB stats: no data (previous errors?)")
return

# Encode into InfluxDB data packet
fluxurl = self._influxdb_url()
fluxjson = json.dumps(fluxdata)
self.LOG.debug("POST to {0} with {1}".format(fluxurl.split('?')[0], fluxjson))
self.LOG.debug("POST to {0} with {1}".format(fluxurl.split('?')[0], datastr))

# Push it!
try:
# TODO: Use a session
requests.post(fluxurl, data=fluxjson, timeout=self.influxdb.timeout)
except RequestException, exc:
self.LOG.info("InfluxDB POST error: {0}".format(exc))
response = requests.post(fluxurl, data=datastr, timeout=self.influxdb.timeout)
response.raise_for_status()
except RequestException as exc:
self.LOG.warn("InfluxDB POST error: {0}".format(exc))
except HTTPError as exc:
self.LOG.warn("InfluxDB POST HTTP error {0}: Response: {1}".format(
str(response.status_code), response.content))


def run(self):
Expand Down Expand Up @@ -183,7 +155,7 @@ def module_test():
pprint.pprint(views)

print("%s - %s" % (engine.engine_id, engine.open()))
except (error.LoggableError, xmlrpc.ERRORS), torrent_exc:
except (error.LoggableError, xmlrpc.ERRORS) as torrent_exc:
print("ERROR: %s" % torrent_exc)


Expand Down
61 changes: 42 additions & 19 deletions src/pyrocore/util/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,59 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from __future__ import absolute_import

import time

from pyrocore import error
from pyrocore.util import xmlrpc

def engine_data(engine):
""" Get important performance data and metadata from rTorrent.
"""
views = ("default", "main", "started", "stopped", "complete",
"incomplete", "seeding", "leeching", "active", "messages")
proxy = engine.open()
views = proxy.view.list()

methods = [
"throttle.global_up.rate", "throttle.global_up.max_rate",
"throttle.global_down.rate", "throttle.global_down.max_rate",
"pieces.stats_not_preloaded", "pieces.stats_preloaded",
"system.files.opened_counter", "system.files.failed_counter", "system.files.closed_counter",
"pieces.memory.block_count", "pieces.memory.current",
"network.open_sockets"
]

# Get data via multicall
proxy = engine.open()
calls = [dict(methodName=method, params=[]) for method in methods] \
+ [dict(methodName="view.size", params=['', view]) for view in views]
result = proxy.system.multicall(calls, flatten=True)
result_dict = {}
for m in methods:
result_dict[m] = result[0]
del result[0]
result_dict['views'] = {}
for v in views:
result_dict['views'][v] = {}
result_dict['views'][v]['size'] = result[0]
del result[0]
return result_dict

def module_test():
""" Quick test using…

python -m pyrocore.util.stats
"""
import pprint
from pyrocore import connect

try:
engine = connect()
print("%s - %s" % (engine.engine_id, engine.open()))

result = engine_data(engine)
print "result = ",
pprint.pprint(result)

print("%s - %s" % (engine.engine_id, engine.open()))
except (error.LoggableError, xmlrpc.ERRORS), torrent_exc:
print("ERROR: %s" % torrent_exc)


# Build result object
data = dict(
now = time.time(),
engine_id = engine.engine_id,
versions = engine.versions,
uptime = engine.uptime,
upload = [result[0], result[1]],
download = [result[2], result[3]],
views = dict([(name, result[4+i])
for i, name in enumerate(views)
]),
)

return data
if __name__ == "__main__":
module_test()