Skip to content

Commit

Permalink
Merge pull request #15 from sbng/scrape-staleness
Browse files Browse the repository at this point in the history
Scrape output base on Prometheus format
  • Loading branch information
bewandered authored Oct 29, 2024
2 parents 9970da1 + 2b87eb8 commit 9aa3698
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 46 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,27 @@ tidy up first, mainly to regenerate ra-dns-check.sh, like so:

After the initial install of the venv or python dependencies, you probably
don't need to do these again, unless the dependencies have changed.

## Additional feature

* Added a --scrape argument. This argument will use the time provided (dtime1) variable to obtain the
'rt' of DNS query. The output are formatted to allow Prometheus to scrape.
* Additonally, another arugment --probes '<list of probes>' only output the specific probes values offered
by the <list of probes>
* if --datetime1 is missing, the latest set of data base on time now() will be output
```
ra-dns-check.py --datetime1 202210080000 12016241 --scrape
ra-dns-check.py --datetime1 202210080000 12016241 --scrape --probes "999,99"
ra-dns-check.py 64573001 --scrape
```
* Add autocomplete support via argcomplete module. This feature allows the usage of tab key for command/arguments completion.
```
pip install -r requirements.txt
ra-dns-check.py --scrape 43869257 --autocomplete
source <(register-python-argcomplete ra-dns-check.py)
```
* Enforce staleness of data under the Prometheus scrape feature. Staleness is defined period of the data that would be scraped between now() and the staleness period (in seconds). Stale data will be log as log level info.
```
# this command pull the latest data on Oct 8 2022 for measurement 38588031 (probe 928 abd 975)
ra-dns-check.py --datetime1 202210080000 12016241 --scrape --scrape_staleness_seconds 38588031 --probes "928,975" --log_level INFO
```
238 changes: 192 additions & 46 deletions ra-dns-check.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

# Please see the file LICENSE for the license.

import argparse
import argparse,argcomplete
# need ast to more safely parse config file
import ast
import configparser
Expand All @@ -21,6 +21,7 @@
from datetime import datetime
# to decompress RIPE Atlas probe data file
import bz2
import base64
# needed to fetch the probe properties file from RIPE
import urllib.request
# These RIPE python modules are usually installed with pip:
Expand All @@ -31,11 +32,13 @@
from ripe.atlas.cousteau import Measurement
# for debugging
from pprint import pprint

#
# Valid log levels
valid_log_levels = ['DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL']

# Valid id server method
valid_id_server_method = ['quad9','google','cloudflare']

# Change "CRITICAL" to DEBUG if you want debugging-level logging *before* this
# script parses command line args, and subsequently sets the debug level:
logging.basicConfig(level=logging.CRITICAL)
Expand Down Expand Up @@ -188,10 +191,35 @@
'default': 86400,
'help': 'The max age (seconds) of the RIPE Atlas probe info file (older than this and we download a new one). Default: 86400',
'type': 'integer'},
'scrape': {
'default': False,
'help': 'Scrape output for Prometheus',
'type': 'boolean'},
'include_probe_timestamp': {
'default': False,
'help': 'Default timestamp are not display in the Prometheus output, toggle this switch to display timestamp',
'type': 'boolean'},
'autocomplete': {
'default': False,
'help': 'Autocomplete for all options available',
'type': 'boolean'},
'probes': {
'default': [],
'help': 'Selected probes list eg. "919,166,1049"',
'type': 'list'},
'id_servermethod': {
'default': 'quad9',
'help': 'Select the method to handle id.server, option include [quad9,google,cloudflare]',
'type': 'string'},
'exclusion_list_file': {
'default': None,
'help': 'Filename for probe ID exclusion list',
'type': 'string'},
'scrape_staleness_seconds': {
'default': int(time.time()),
'help': 'Staleness of the data/records that exceed this value would be ignored. Staleness is determine between now() and record timestamp',
'type': 'integer'},

}

sample_config_string = sample_config_string_header
Expand Down Expand Up @@ -260,8 +288,15 @@
parser.add_argument('-S', '--slow_threshold', help=options_sample_dict['slow_threshold']['help'], type=int, default=options_sample_dict['slow_threshold']['default'])
parser.add_argument('-t', '--split_char', help=options_sample_dict['split_char']['help'], type=str, default=options_sample_dict['split_char']['default'])
parser.add_argument('-u', '--print_summary_stats', help=options_sample_dict['print_summary_stats']['help'], action='store_true', default=options_sample_dict['print_summary_stats']['default'])
parser.add_argument('--scrape', help=options_sample_dict['scrape']['help'], action='store_true', default=options_sample_dict['scrape']['default'])
parser.add_argument('--include_probe_timestamp', help=options_sample_dict['include_probe_timestamp']['help'], action='store_true', default=options_sample_dict['include_probe_timestamp']['default'])
parser.add_argument('--autocomplete', help=options_sample_dict['autocomplete']['help'], action='store_true', default=options_sample_dict['autocomplete']['default'])
parser.add_argument('--probes', help=options_sample_dict['probes']['help'], type=str, default=options_sample_dict['probes']['default'])
parser.add_argument('--id_servermethod', help=options_sample_dict['id_servermethod']['help'], type=str, choices=valid_id_server_method, default=options_sample_dict['id_servermethod']['default'])
parser.add_argument('--scrape_staleness_seconds', help=options_sample_dict['scrape_staleness_seconds']['help'], type=int, default=options_sample_dict['scrape_staleness_seconds']['default'])
parser.add_argument('filename_or_msmid', help='one or two local filenames or RIPE Atlas Measurement IDs', nargs='+')
parser.format_help()
argcomplete.autocomplete(parser)
args = parser.parse_known_args()
###pprint(args)

Expand Down Expand Up @@ -413,8 +448,8 @@ def user_datetime_to_valid_unixtime(user_dt_string):
# It's not unix time, so try to convert from some data time formats
for f in accepted_datetime_formats:
try:
# print (user_dt_string + ' / ' + f)
_unixtime_candidate = int(time.mktime(time.strptime(user_dt_string, f)))
# print (user_dt_string + ' / ' + f) and offset the time zone to UTC
_unixtime_candidate = int(time.mktime(time.strptime(user_dt_string, f))) - time.timezone
if is_valid_unixtime(_unixtime_candidate):
logger.debug('Accepted %i as valid unixtime.\n' % _unixtime_candidate)
return (_unixtime_candidate)
Expand Down Expand Up @@ -506,6 +541,13 @@ class fmt:

# def id(self):

# if autocomplete option is set, register all the options for autocomplete then exit
if args[0].autocomplete:
prog = sys.argv[0].removeprefix("./")
print(f'source <(register-python-argcomplete {prog})')
os.system('source <(register-python-argcomplete {prog})')
exit()

# Validate the supplied date-times and stick them in a list
if args[0].datetime1:
logger.debug(args[0].datetime1)
Expand Down Expand Up @@ -558,7 +600,7 @@ class fmt:
# Process the data, either from a local file or by requesting it over the
# 'net from RIPE Atlas.
#
def process_request(_data_source, _results_set_id, _unixtime):
def process_request(_data_source, _results_set_id, _unixtime, probes = []):
logger.info('Trying to access data_source %s for unixtime %s\n' % (_data_source, _unixtime))
# First we try to open the _data_source as a local file. If it exists,
# read in the measurement results from a filename the user has
Expand Down Expand Up @@ -589,7 +631,8 @@ def process_request(_data_source, _results_set_id, _unixtime):
# If we have no unixtime to request results from, then we get the latest results
if _unixtime == 0:
kwargs = {
"msm_id": measurement_id
"msm_id": measurement_id,
"probe_ids": probes
}
logger.info('Fetching latest results for Measurement %i from RIPE Atlas API...\n' % measurement_id)
is_success, results = AtlasLatestRequest(**kwargs).create()
Expand All @@ -603,7 +646,8 @@ def process_request(_data_source, _results_set_id, _unixtime):
kwargs = {
"msm_id": measurement_id,
"start": _unixtime,
"stop": _stop_time
"stop": _stop_time,
"probe_ids": probes
}
logger.info('Fetching results for Measurement %i, start unixtime: %s stop unixtime: %s\n' % (measurement_id, _unixtime, _stop_time))
is_success, results = AtlasResultsRequest(**kwargs).create()
Expand Down Expand Up @@ -736,7 +780,7 @@ def process_request(_data_source, _results_set_id, _unixtime):
m_timestamps[_results_set_id].sort()
m_seen_probe_ids[_results_set_id].sort()
logger.debug('m_seen_probe_ids[_results_set_id] is %d\n' % len(m_seen_probe_ids[_results_set_id]))
return measurement_id
return measurement_id, results

# END def process_request
####################
Expand Down Expand Up @@ -853,61 +897,164 @@ def load_probe_properties(probe_ids, ppcf):
all_probes_dict = {}
#
logger.info ('Reading the probe data dictionary as a JSON file from %s...\n' % ppcf)
try:
with open(ppcf, 'r') as f:
all_probes_dict = json.load(f)
except:
logger.critical ('Cannot read probe data from file: %s\n' % ppcf)
exit(13)
while True:
try:
with open(ppcf, 'r') as f:
all_probes_dict = json.load(f)
except:
logger.critical ('Cannot read probe data from file: %s\n' % ppcf)
logger.critical ('Regenerating probe data to file: %s\n' % ppcf)
_res = check_update_probe_properties_cache_file(config['ripe_atlas_probe_properties_raw_file'],
config['ripe_atlas_probe_properties_json_cache_file'],
config['ripe_atlas_current_probe_properties_url'])
if _res != 0:
logger.critical('Unexpected result when updating local cache files: %s' % _res)
continue
break
# Loop through the list of supplied (seen) probe ids and collect their
# info/meta data from either our local file or the RIPE Atlas API
logger.info ('Matching seen probes with probe data; will query RIPE Atlas API for probe info not in local cache...\n')
for p in probe_ids:
if p in all_probes_dict.keys():
probe_cache_hits += 1
logger.debug('Probe %s info found in local cache.' % p)
matched_probe_info[p] = all_probes_dict[p]
else:
# If it's not in the cache file, request it from RIPE
#logger.debug ('NOT cached, trying RIPE Atlas...')
try:
ripe_result = Probe(id=p)
#
matched_probe_info[p] = {'asn_v4': ripe_result.asn_v4,
'asn_v6': ripe_result.asn_v6,
'country_code': ripe_result.country_code,
'address_v4': ripe_result.address_v4,
'address_v6': ripe_result.address_v6}
probe_cache_misses += 1
all_probes_dict[p] = matched_probe_info[p]
logger.debug('Probe %9s info fetched from RIPE' % p)
except:
# Otherwise, it's empty
# we did not find any information about the probe, so set values to '-'
matched_probe_info[p] = { 'asn_v4': '-',
'asn_v6': '-',
'country_code': '-',
'address_v4': '-',
'address_v6': '-' }
logger.debug('Failed to get info about probe ID %s in the local cache or from RIPE Atlas API.' % p)
# Using python set() data strucure instead of checking ccahe hits or misses
# python set() element is unique and using set theory, we can identify the differences
# and discover new element not found in the caches. This reduce computational complexity
dns_probes = set(str(x) for x in probe_ids)
all_probes = set(all_probes_dict.keys())
new_probes = dns_probes.difference(all_probes)
for i in (dns_probes - new_probes):
matched_probe_info[i] = all_probes_dict[i]
for p in new_probes:
try:
ripe_result = Probe(id=p)
matched_probe_info[p] = {'asn_v4': ripe_result.asn_v4,
'asn_v6': ripe_result.asn_v6,
'country_code': ripe_result.country_code,
'lat': ripe_result.geometry['coordinates'][1],
'lon': ripe_result.geometry['coordinates'][0],
'address_v4': ripe_result.address_v4,
'address_v6': ripe_result.address_v6}
all_probes_dict[p] = matched_probe_info[p]
logger.debug('Probe %9s info fetched from RIPE' % p)
except:
# Otherwise, it's empty
# we did not find any information about the probe, so set values to '-'
matched_probe_info[p] = { 'asn_v4': '-',
'asn_v6': '-',
'country_code': '-',
'lat': '-',
'lon': '-',
'address_v4': '-',
'address_v6': '-' }
logger.debug('Failed to get info about probe ID %s in the local cache or from RIPE Atlas API.' % p)
logger.info('cache hits: %i cache misses: %i.\n' % (probe_cache_hits, probe_cache_misses))
# Write out the local JSON cache file
with open(ppcf, mode='w') as f:
json.dump(all_probes_dict, f)
if len(new_probes) != 0:
with open(ppcf, mode='w') as f:
json.dump(all_probes_dict, f)
return(matched_probe_info)
####################
#
# Input a dictionary -> Output a single line of scrape formatted string
def dict_string(d):
dict_str = ""
for i,v in d.items():
# Ignore the string output if the value of a key is "None"
if v == "None":
dict_str += str(i) + "=\"" + "unknown" + "\","
else:
dict_str += str(i) + "=\"" + str(v) + "\","
return dict_str.rstrip(",")

#####################
#
# Input a base64 encoded bytes string -> output the last word of the string
def decode_base64(abuf):
locate_nsid = re.sub(r'(\\x..)|\'|(\\t)',' ',str(abuf)).split()
return locate_nsid[-1]

#####################
#
# Input any string -> output with santinization of special characters with exception of '-'
def sanitize_string(s):
# Regex to match special characters and remove it however need to preserve '-'
return re.sub(r'\W+','',s.replace('-','_'))

#####################
#
# Input any timestamp and staleness -> output None value if timestamp exceed staleness else output timestamp
def check_freshness(timestamp,staleness):
freshness = int(time.time()) - timestamp
if (freshness > staleness):
return None
else:
return timestamp

# END of all function defs
##################################################

######
#
# Data loading and summary stats reporting loop ...

try:
probes = args[0].probes.split(',')
except:
probes = []

if args[0].scrape:
staleness = args[0].scrape_staleness_seconds
m, dnsresult = process_request(data_sources[results_set_id], results_set_id, unixtimes[results_set_id], probes)
p_probe_properties = load_probe_properties([dnsresult[x]['prb_id'] for x in range(len(dnsresult))], config['ripe_atlas_probe_properties_json_cache_file'])
print ('''# HELP ripe_atlas_latency The number of milliseconds for response reported by this probe for the time period requested on this measurement
# TYPE ripe_atlas_latency gauge ''')
for dnsprobe in dnsresult:
try:
probe_num = str(dnsprobe['prb_id'])
delay = dnsprobe['result']['rt']
timestamp = check_freshness(dnsprobe['timestamp'],staleness)
if timestamp != None:
ripe_atlas_latency = { 'measurement_id' : str(dnsprobe['msm_id']),
'probe_id' : str(dnsprobe['prb_id']),
'version' : str(dnsprobe['af']),
'target_ip' : str(dnsprobe['dst_addr']),
'probe_asn_v4' : str(p_probe_properties[probe_num]['asn_v4']),
'probe_address_v4' : str(dnsprobe['from']),
'probe_asn_v6' : str(p_probe_properties[probe_num]['asn_v6']),
'probe_address_v6' : str(p_probe_properties[probe_num]['address_v6']),
'probe_country' : str(p_probe_properties[probe_num]['country_code']),
'probe_lat' : str(p_probe_properties[probe_num]['latitude']),
'probe_lon' : str(p_probe_properties[probe_num]['longitude']),
}
method = args[0].id_servermethod
if method == 'quad9':
nsid = decode_base64(base64.b64decode(str(dnsprobe['result']['abuf'])))
ripe_atlas_latency['sample_reported_pop'] = sanitize_string(str(nsid.split('.')[1]))
ripe_atlas_latency['sample_reported_host'] = sanitize_string(str(nsid.split('.')[0]))
elif method == 'cloudflare':
ripe_atlas_latency['sample_reported_pop'] = sanitize_string(str(dnsprobe['result']['answers'][0]['RDATA'][0]))
ripe_atlas_latency['sample_reported_host'] = "unknown"
elif method == 'google':
nsid = decode_base64(base64.b64decode(str(dnsprobe['result']['abuf'])))
ripe_atlas_latency['sample_reported_pop'] = sanitize_string(str(nsid))
ripe_atlas_latency['sample_reported_host'] = "unknown"
else:
ripe_atlas_latency['sample_reported_pop'] = "unknown"
ripe_atlas_latency['sample_reported_host'] = "unknown"
labels = dict_string(ripe_atlas_latency)
if (args[0].include_probe_timestamp) or (args[0].datetime1 != None) :
print (f'ripe_atlas_latency{{{labels}}} {delay} {timestamp}')
else:
print (f'ripe_atlas_latency{{{labels}}} {delay}')
else:
logger.info('Skipping probe %s - sample is %i seconds old' % (probe_num,int(time.time() - dnsprobe['timestamp'])))
except:
pass
exit()

while results_set_id <= last_results_set_id:
#for t in data_sources:
# m will receive the measurement ID for the processed data source
logger.debug('data_source: %s results_set_id: %i unixtime: %i\n' % (data_sources[results_set_id], results_set_id, unixtimes[results_set_id]))
m = process_request(data_sources[results_set_id], results_set_id, unixtimes[results_set_id])
m, r = process_request(data_sources[results_set_id], results_set_id, unixtimes[results_set_id], probes)
measurement_ids.append(m)
######
# Summary stats
Expand Down Expand Up @@ -941,7 +1088,6 @@ def load_probe_properties(probe_ids, ppcf):
#
results_set_id += 1
# end of Data loading and summary stats reporting loop

########################################

# Check to see if there are two sets of results. If there are, see if
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ socketIO-client==0.7.2
tzlocal==2.1
urllib3==1.26.19
websocket-client==0.58.0
argcomplete==2.0.0

0 comments on commit 9aa3698

Please sign in to comment.