-
Notifications
You must be signed in to change notification settings - Fork 0
/
stats.py
95 lines (82 loc) · 3.12 KB
/
stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import os
import mmap
import json
import filelock
import requests
from collections import defaultdict
from datetime import datetime, timedelta
DUMP_FILE = 'stats.db'
WALLET_ADDR = '442uGwAdS8c3mS46h6b7KMPQiJcdqmLjjbuetpCfSKzcgv4S56ASPdvXdySiMizGTJ56ScZUyugpSeV6hx19QohZTmjuWiM'
TICKER_URL = 'https://api.cryptonator.com/api/ticker/xmr-usd'
STATS_URL = 'https://api.xmrpool.net/miner/{}/stats'.format(WALLET_ADDR)
MINER_URL = 'https://api.xmrpool.net/miner/{}/identifiers'.format(WALLET_ADDR)
LOCK = filelock.FileLock('/tmp/stats.lock')
# cache results in memory, refresh at an interval
# lower than the stats update interval
CACHED = {}
LAST_CHECKED = defaultdict(lambda: datetime.now())
REFRESH_INTERVAL = 60 * 5 # seconds
def get_stats():
miners = requests.get(MINER_URL).json()
data = {}
data['ticker'] = requests.get(TICKER_URL).json()['ticker']
data['stats'] = requests.get(STATS_URL).json()
data['n_miners'] = len(miners)
data['timestamp'] = datetime.utcnow().timestamp()
return data
def snapshot_stats():
with LOCK.acquire(timeout=10):
stats = get_stats()
with open(DUMP_FILE, 'a') as f:
f.write(json.dumps(stats))
f.write('\n')
return stats
def last_n_with_cache(n, step_size=1):
key = '{}:{}'.format(n, step_size)
time_since = (datetime.now() - LAST_CHECKED[key]).seconds
if key not in CACHED or not CACHED[key] or time_since >= REFRESH_INTERVAL:
print('{}: refreshing cache:'.format(os.getpid()), key)
# try to refresh, but if not possible, just use existing
try:
CACHED[key] = last_n(n, step_size)
LAST_CHECKED[key] = datetime.now()
except filelock.Timeout:
print('couldnt acquire filelock, sending stale data')
if key not in CACHED:
CACHED[key] = []
# set last checked time to force a re-check next query
LAST_CHECKED[key] = datetime.now() - timedelta(seconds=REFRESH_INTERVAL)
else:
print('{}: loading from cache:'.format(os.getpid()), key)
return CACHED[key]
def last_n(n, step_size=1):
"""returns last n items"""
lines = list(l.decode('utf8') for l in tail(n*step_size))
lines = list(reversed(lines))
lines = lines[0::step_size]
return list((map(json.loads, lines)))
# <https://stackoverflow.com/a/6813975>
def tail(n):
"""returns last n lines from the dump"""
# check if the last line is an empty line
# so we can adjust n accordingly
if n > 0 and last_n(n=0):
n -= 1
with LOCK.acquire(timeout=5):
try:
size = os.path.getsize(DUMP_FILE)
except FileNotFoundError:
return []
if size == 0:
return []
with open(DUMP_FILE, 'rb') as f:
fm = mmap.mmap(f.fileno(), 0, mmap.MAP_SHARED, mmap.PROT_READ)
try:
for i in range(size - 1, -1, -1):
if fm[i] == ord('\n'):
n -= 1
if n == -1:
break
return fm[i + 1 if i else 0:].splitlines()
finally:
fm.close()