Skip to content
This repository has been archived by the owner on May 4, 2021. It is now read-only.

Ticket28566 rebased on 29591 #345

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
2 changes: 1 addition & 1 deletion CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ When a PR is being reviewed, new changes might be needed:

- If the change does not modify a previous change, create new commits and push.
- If the change modifies a previous change and it's small,
`git commit fixup <https://git-scm.com/docs/git-commit#git-commit---fixupltcommitgt>`_
`git commit fixup <https://git-scm.com/docs/git-commit#Documentation/git-commit.txt---fixupltcommitgt>`_
should be used. When it is agreed that the PR is ready, create a new branch
named ``mybranch_02`` and run:

Expand Down
6 changes: 4 additions & 2 deletions sbws/core/scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,10 @@ def run_speedtest(args, conf):
# Call only once to initialize http_headers
settings.init_http_headers(conf.get('scanner', 'nickname'), state['uuid'],
str(controller.get_version()))

rl = RelayList(args, conf, controller)
# To do not have to pass args and conf to RelayList, pass an extra
# argument with the data_period
measurements_period = conf.getint('general', 'data_period')
rl = RelayList(args, conf, controller, measurements_period, state)
cb = CB(args, conf, controller, rl)
rd = ResultDump(args, conf)
rp = RelayPrioritizer(args, conf, rl, rd)
Expand Down
10 changes: 10 additions & 0 deletions sbws/globals.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@
# Tor already accept lines of any size, but leaving the limit anyway.
BW_LINE_SIZE = 1022

# RelayList, ResultDump, v3bwfile
# For how many seconds in the past the relays and measurements data is keep/
# considered valid.
# This is currently set by default in config.default.ini as ``date_period``,
# and used in ResultDump and v3bwfile.
# In a future refactor, constants in config.default.ini should be moved here,
# or calculated in settings, so that there's no need to pass the configuration
# to all the functions.
MEASUREMENTS_PERIOD = 5 * 24 * 60 * 60

# Metadata to send in every requests, so that data servers can know which
# scanners are using them.
# In Requests these keys are case insensitive.
Expand Down
196 changes: 183 additions & 13 deletions sbws/lib/relaylist.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,62 @@
import copy
from datetime import datetime, timedelta

from stem.descriptor.router_status_entry import RouterStatusEntryV3
from stem.descriptor.server_descriptor import ServerDescriptor
from stem import Flag, DescriptorUnavailable, ControllerError
import random
import time
import logging
from threading import Lock

from ..globals import MEASUREMENTS_PERIOD

log = logging.getLogger(__name__)


def remove_old_consensus_timestamps(
juga0 marked this conversation as resolved.
Show resolved Hide resolved
consensus_timestamps, measurements_period=MEASUREMENTS_PERIOD):
"""
Remove the consensus timestamps that are older than period for which
the measurements are keep from a list of consensus_timestamps.

:param list consensus_timestamps:
:param int measurements_period:
:returns list: a new list of ``consensus_timestamps``
"""
oldest_date = datetime.utcnow() - timedelta(measurements_period)
new_consensus_timestamps = \
[t for t in consensus_timestamps if t >= oldest_date]
return new_consensus_timestamps


def valid_after_from_network_statuses(network_statuses):
juga0 marked this conversation as resolved.
Show resolved Hide resolved
"""Obtain the consensus Valid-After datetime from the ``document``
attribute of a ``stem.descriptor.RouterStatusEntryV3``.

:param list network_statuses:
returns datetime:
"""
for ns in network_statuses:
document = getattr(ns, 'document', None)
if document:
valid_after = getattr(document, 'valid_after', None)
if valid_after:
return valid_after
return datetime.utcnow().replace(microsecond=0)


class Relay:
def __init__(self, fp, cont, ns=None, desc=None):
def __init__(self, fp, cont, ns=None, desc=None, timestamp=None):
juga0 marked this conversation as resolved.
Show resolved Hide resolved
'''
Given a relay fingerprint, fetch all the information about a relay that
sbws currently needs and store it in this class. Acts as an abstraction
to hide the confusion that is Tor consensus/descriptor stuff.

:param str fp: fingerprint of the relay.
:param cont: active and valid stem Tor controller connection

:param datatime timestamp: the timestamp of a consensus
(RouterStatusEntryV3) from which this relay has been obtained.
'''
assert isinstance(fp, str)
assert len(fp) == 40
Expand All @@ -38,6 +77,8 @@ def __init__(self, fp, cont, ns=None, desc=None):
self._desc = cont.get_server_descriptor(fp, default=None)
except (DescriptorUnavailable, ControllerError) as e:
log.exception("Exception trying to get desc %s", e)
self._consensus_timestamps = []
self._add_consensus_timestamp(timestamp)

def _from_desc(self, attr):
if not self._desc:
Expand Down Expand Up @@ -107,6 +148,68 @@ def master_key_ed25519(self):
return None
return key.rstrip('=')

@property
def consensus_valid_after(self):
juga0 marked this conversation as resolved.
Show resolved Hide resolved
"""Obtain the consensus Valid-After from the document of this relay
network status.
"""
network_status_document = self._from_ns('document')
if network_status_document:
return getattr(network_status_document, 'valid_after', None)
return None

@property
def last_consensus_timestamp(self):
if len(self._consensus_timestamps) >= 1:
return self._consensus_timestamps[-1]
return None

def _add_consensus_timestamp(self, timestamp=None):
"""Add the consensus timestamp in which this relay is present.
"""
# It is possible to access to the relay's consensensus Valid-After
if self.consensus_valid_after is not None:
# The consensus timestamp list was initialized.
if self.last_consensus_timestamp is not None:
# Valid-After is more recent than the most recent stored
# consensus timestamp.
if self.consensus_valid_after > self.last_consensus_timestamp:
# Add Valid-After
self._consensus_timestamps.append(
self.consensus_valid_after
)
# The consensus timestamp list was not initialized.
else:
# Add Valid-After
self._consensus_timestamps.append(self.consensus_valid_after)
# If there was already a list the timestamp arg is more recent than
# the most recent timestamp stored,
elif (self.last_consensus_timestamp is not None
and timestamp > self.last_consensus_timestamp):
# Add the arg timestamp.
self._consensus_timestamps.append(timestamp)
# In any other case
else:
# Add the current datetime
self._consensus_timestamps.append(
datetime.utcnow().replace(microsecond=0))

def _remove_old_consensus_timestamps(
juga0 marked this conversation as resolved.
Show resolved Hide resolved
self, measurements_period=MEASUREMENTS_PERIOD):
self._consensus_timestamps = \
remove_old_consensus_timestamps(
copy.deepcopy(self._consensus_timestamps, measurements_period)
)

def update_consensus_timestamps(self, timestamp=None):
self._add_consensus_timestamp(timestamp)
self._remove_old_consensus_timestamps()

@property
def relay_in_recent_consensus_count(self):
"""Number of times the relay was in a conensus."""
return len(self._consensus_timestamps)

def can_exit_to_port(self, port):
"""
Returns True if the relay has an exit policy and the policy accepts
Expand All @@ -129,16 +232,40 @@ class RelayList:
transparently in the background. Provides useful interfaces for getting
only relays of a certain type.
'''
REFRESH_INTERVAL = 300 # seconds

def __init__(self, args, conf, controller):
def __init__(self, args, conf, controller,
measurements_period=MEASUREMENTS_PERIOD, state=None):
self._controller = controller
self.rng = random.SystemRandom()
self._refresh_lock = Lock()
# To track all the consensus seen.
self._consensus_timestamps = []
# Initialize so that there's no error trying to access to it.
# In future refactor, change to a dictionary, where the keys are
# the relays' fingerprint.
self._relays = []
# The period of time for which the measurements are keep.
self._measurements_period = measurements_period
self._state = state
self._refresh()

def _need_refresh(self):
juga0 marked this conversation as resolved.
Show resolved Hide resolved
return time.time() >= self._last_refresh + self.REFRESH_INTERVAL
# New consensuses happen every hour.
return datetime.utcnow() >= \
self.last_consensus_timestamp + timedelta(seconds=60*60)

@property
def last_consensus_timestamp(self):
"""Returns the datetime when the last consensus was obtained."""
if (getattr(self, "_consensus_timestamps")
and self._consensus_timestamps):
return self._consensus_timestamps[-1]
# If the object was not created from __init__, it won't have
# consensus_timestamps attribute or it might be empty.
# In this case force new update.
# Anytime more than 1h in the past will be old.
self._consensus_timestamps = []
return datetime.utcnow() - timedelta(seconds=60*61)

@property
def relays(self):
Expand Down Expand Up @@ -197,19 +324,62 @@ def _relays_with_flag(self, flag):
def _relays_without_flag(self, flag):
return [r for r in self.relays if flag not in r.flags]

def _remove_old_consensus_timestamps(self):
juga0 marked this conversation as resolved.
Show resolved Hide resolved
self._consensus_timestamps = remove_old_consensus_timestamps(
copy.deepcopy(self._consensus_timestamps),
self._measurements_period
)

def _init_relays(self):
"""Returns a new list of relays that are in the current consensus.
And update the consensus timestamp list with the current one.

"""
c = self._controller
try:
relays = [Relay(ns.fingerprint, c, ns=ns)
for ns in c.get_network_statuses()]
except ControllerError as e:
log.exception("Exception trying to init relays %s", e)
return []
return relays
# This will get router statuses from this Tor cache, might not be
# updated with the network.
# Change to stem.descriptor.remote in future refactor.
network_statuses = c.get_network_statuses()
new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses])

# Find the timestamp of the last consensus.
timestamp = valid_after_from_network_statuses(network_statuses)
self._consensus_timestamps.append(timestamp)
self._remove_old_consensus_timestamps()
# Update the relays that were in the previous consensus with the
# new timestamp
new_relays = []
relays = copy.deepcopy(self._relays)
for r in relays:
if r.fingerprint in new_relays_dict.keys():
r.update_consensus_timestamps(timestamp)
new_relays_dict.pop(r.fingerprint)
new_relays.append(r)

# Add the relays that were not in the previous consensus
# If there was an relay in some older previous consensus,
# it won't get stored, so its previous consensuses are lost,
# but probably this is fine for now to don't make it more complicated.
for fp, ns in new_relays_dict.items():
r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp)
new_relays.append(r)
return new_relays

def _refresh(self):
# Set a new list of relays.
self._relays = self._init_relays()
self._last_refresh = time.time()

log.info("Number of consensuses obtained in the last %s days: %s.",
int(self._measurements_period / 24 / 60 / 60),
self.recent_consensus_count)
# NOTE: blocking, writes to file!
if self._state is not None:
self._state['recent_consensus_count'] = self.recent_consensus_count

@property
def recent_consensus_count(self):
"""Number of times a new consensus was obtained."""
return len(self._consensus_timestamps)

def exits_not_bad_allowing_port(self, port):
return [r for r in self.exits
Expand Down
Loading