diff --git a/clean-redis.py b/clean-redis.py index c637f89..4b42fb5 100644 --- a/clean-redis.py +++ b/clean-redis.py @@ -82,9 +82,9 @@ def initialize_logger(debug_mode=True): queue=QUEUE, stale_time=STALE_TIME) - _logger.info('Janitor initialized. ' - 'Cleaning queues `%s` and `%s:*` every `%s` seconds.', - janitor.queue, janitor.processing_queue, INTERVAL) + queues = ' and '.join('`%s:*`' % q for q in janitor.processing_queues) + _logger.info('Janitor initialized. Cleaning queues `%s` and %s every %ss.', + janitor.queue, queues, INTERVAL) while True: try: diff --git a/pytest.ini b/pytest.ini index 023c89d..8186dc2 100644 --- a/pytest.ini +++ b/pytest.ini @@ -18,3 +18,6 @@ norecursedirs= # W503 line break occurred before a binary operator pep8ignore=* E731 + +# Enable line length testing with maximum line length of 85 +pep8maxlinelength = 85 diff --git a/redis_janitor/janitors.py b/redis_janitor/janitors.py index 477a3fe..79026f4 100644 --- a/redis_janitor/janitors.py +++ b/redis_janitor/janitors.py @@ -63,10 +63,17 @@ def __init__(self, # attributes for managing pod state self.whitelisted_pods = ['zip-consumer'] - self.valid_pod_phases = {'Running', 'Pending'} + self.valid_pod_phases = { + 'Running', + # 'Pending', + # 'ContainerCreating' + } self.total_repairs = 0 - self.processing_queue = 'processing-{}'.format(self.queue) + self.processing_queues = [ + 'processing-{}'.format(self.queue), + 'processing-{}-zip'.format(self.queue) + ] self.cleaning_queue = '' # update this in clean() def get_core_v1_client(self): @@ -118,9 +125,11 @@ def list_namespaced_pod(self): return response.items def get_processing_keys(self, count=100): - match = '{}:*'.format(self.processing_queue) - processing_keys = self.redis_client.scan_iter(match=match, count=count) - return processing_keys + for q in self.processing_queues: + match = '{}:*'.format(q) + keys = self.redis_client.scan_iter(match=match, count=count) + for key in keys: + yield key def is_whitelisted(self, pod_name): """Ignore missing pods that are whitelisted""" @@ -131,7 +140,7 @@ def remove_key_from_queue(self, redis_key): start = timeit.default_timer() res = self.redis_client.lrem(self.cleaning_queue, 1, redis_key) if res: - self.logger.debug('Removed key `%s` from %s` in %s seconds.', + self.logger.debug('Removed key `%s` from `%s` in %s seconds.', redis_key, self.cleaning_queue, timeit.default_timer() - start) else: @@ -143,10 +152,15 @@ def repair_redis_key(self, redis_key): is_removed = self.remove_key_from_queue(redis_key) if is_removed: start = timeit.default_timer() - self.redis_client.lpush(self.queue, redis_key) + source_queue = self.cleaning_queue.split(':')[0] + source_queue = source_queue.split('processing-')[-1] + self.redis_client.lpush(source_queue, redis_key) self.logger.debug('Pushed key `%s` to `%s` in %s seconds.', - redis_key, self.queue, + redis_key, source_queue, timeit.default_timer() - start) + else: + self.logger.warning('Tried to repair key %s but it was no longer ' + 'in %s', redis_key, self.cleaning_queue) return is_removed def _update_pods(self): @@ -170,61 +184,91 @@ def update_pods(self): def is_valid_pod(self, pod_name): self.update_pods() # only updates if stale - is_valid = pod_name in self.pods + is_valid = False + if pod_name in self.pods: + pod_phase = self.pods[pod_name].status.phase + if pod_phase in self.valid_pod_phases: + is_valid = True return is_valid + def _timestamp_to_age(self, ts): + if ts is None: + return 0 # key is new + + if isinstance(ts, str): + # TODO: `dateutil` deprecated by python 3.7 `fromisoformat` + # ts = datetime.datetime.fromisoformat(ts) + ts = dateutil.parser.parse(ts) + current_time = datetime.datetime.now(pytz.UTC) + diff = current_time - ts + return diff.total_seconds() + def is_stale_update_time(self, updated_time, stale_time=None): stale_time = stale_time if stale_time else self.stale_time if not updated_time: return False if not stale_time > 0: return False - if isinstance(updated_time, str): - # TODO: `dateutil` deprecated by python 3.7 `fromisoformat` - # updated_time = datetime.datetime.fromisoformat(updated_time) - updated_time = dateutil.parser.parse(updated_time) - current_time = datetime.datetime.now(pytz.UTC) - update_diff = current_time - updated_time - return update_diff.total_seconds() >= stale_time - - def clean_key(self, key): - hvals = self.redis_client.hgetall(key) + last_updated = self._timestamp_to_age(updated_time) + return last_updated >= stale_time + def should_clean_key(self, key, updated_ts): + """Return a boolean if the key should be cleaned""" pod_name = self.cleaning_queue.split(':')[-1] - is_valid_pod = self.is_valid_pod(pod_name) - is_stale = self.is_stale_update_time(hvals.get('updated_at')) - - if is_valid_pod and not is_stale: # pylint: disable=R1705 + updated_seconds = self._timestamp_to_age(updated_ts) + + if updated_seconds <= self.pod_refresh_interval * 3: + return False # this is too fresh for our pod data + + if self.is_valid_pod(pod_name): # pod exists in a valid state + # if not self.is_stale_update_time(updated_ts): + # return False # pod exists and key is updated recently + # + # # pod exists but key is stale + # self.logger.warning('Key `%s` in queue `%s` was last updated at ' + # '`%s` (%s seconds ago) and pod `%s` is still ' + # 'alive with status %s but is_stale turned off.', + # key, self.cleaning_queue, updated_ts, + # updated_seconds, pod_name, + # self.pods[pod_name].status.phase) + # # self.kill_pod(pod_name, self.namespace) return False - elif is_stale and not is_valid_pod: - self.logger.warning('Key `%s` in queue `%s` was last updated at ' - '`%s` and pod `%s` is still alive.', - key, self.cleaning_queue, - hvals.get('updated_at'), pod_name) - # self.kill_pod(pod_name, self.namespace) - - elif not is_stale and not is_valid_pod: + # pod is not valid + if pod_name not in self.pods: # pod does not exist self.logger.info('Key `%s` in queue `%s` was last updated by pod ' - '`%s`, but that pod does not exist.', - key, self.cleaning_queue, pod_name) - - else: - self.logger.info('Key `%s` in queue `%s` was last updated at `%s` ' - 'by pod `%s` which no longer exists.', - key, self.cleaning_queue, - hvals.get('updated_at'), pod_name) - - key_status = hvals.get('status') + '`%s` %s seconds ago, but that pod does not ' + 'exist.', key, self.cleaning_queue, pod_name, + updated_seconds) + else: # pod exists but has a bad status + self.logger.info('Key `%s` in queue `%s` was last updated by ' + 'pod `%s` %s seconds ago, but that pod has status' + ' %s.', key, self.cleaning_queue, pod_name, + updated_seconds, self.pods[pod_name].status.phase) + return True - # key is stale, must be repaired somehow - if key_status in {'done', 'failed'}: - # job is finished, no need to restart the key - return bool(self.remove_key_from_queue(key)) - - # if the job is finished, no need to restart the key - return bool(self.repair_redis_key(key)) + def clean_key(self, key): + required_keys = [ + 'status', + 'updated_at', + 'updated_by', + ] + res = self.redis_client.hmget(key, *required_keys) + hvals = {k: v for k, v in zip(required_keys, res)} + + should_clean = self.should_clean_key(key, hvals.get('updated_at')) + + if should_clean: + # key in the processing queue is either stranded or stale + # if the key is finished already, just remove it from the queue + if hvals.get('status') in {'done', 'failed'}: + return bool(self.remove_key_from_queue(key)) + + # if the job is not finished, repair the key + return bool(self.repair_redis_key(key)) + + return should_clean def clean(self): cleaned = 0 diff --git a/redis_janitor/janitors_test.py b/redis_janitor/janitors_test.py index 87366ad..7bf34d4 100644 --- a/redis_janitor/janitors_test.py +++ b/redis_janitor/janitors_test.py @@ -59,6 +59,22 @@ def __init__(self, prefix='predict', status='new'): '{}:{}:{}'.format('other', self.status, 'x.zip'), ] + def _get_dummy_data(self, rhash, identity, updated): + return { + 'model_name': 'model', + 'model_version': '0', + 'field': '61', + 'cuts': '0', + 'updated_by': identity, + 'status': rhash.split(':')[-1], + 'postprocess_function': '', + 'preprocess_function': '', + 'file_name': rhash.split(':')[-1], + 'input_file_name': rhash.split(':')[-1], + 'output_file_name': rhash.split(':')[-1], + 'updated_at': updated, + } + def scan_iter(self, match=None, count=None): for k in self.keys: if match: @@ -82,6 +98,16 @@ def hset(self, rhash, status, value): def lrange(self, queue, start, end): return self.keys[start:end] + def hmget(self, rhash, *keys): + now = datetime.datetime.now(pytz.UTC) + later = (now - datetime.timedelta(minutes=60)) + identity = 'good_pod' if 'good' in rhash else 'bad_pod' + identity = 'zip-consumer' if 'whitelist' in rhash else identity + updated = (later if 'stale' in rhash else now).isoformat(' ') + updated = None if 'malformed' in rhash else updated + dummy = self._get_dummy_data(rhash, identity, updated) + return [dummy.get(k) for k in keys] + def hgetall(self, rhash): now = datetime.datetime.now(pytz.UTC) later = (now - datetime.timedelta(minutes=60)) @@ -89,20 +115,8 @@ def hgetall(self, rhash): identity = 'zip-consumer' if 'whitelist' in rhash else identity updated = (later if 'stale' in rhash else now).isoformat(' ') updated = None if 'malformed' in rhash else updated - return { - 'model_name': 'model', - 'model_version': '0', - 'field': '61', - 'cuts': '0', - 'updated_by': identity, - 'status': rhash.split(':')[-1], - 'postprocess_function': '', - 'preprocess_function': '', - 'file_name': rhash.split(':')[-1], - 'input_file_name': rhash.split(':')[-1], - 'output_file_name': rhash.split(':')[-1], - 'updated_at': updated, - } + dummy = self._get_dummy_data(rhash, identity, updated) + return dummy def type(self, key): return 'hash' @@ -122,13 +136,17 @@ def list_pod_for_all_namespaces(self, *_, **__): if self.fail: raise kubernetes.client.rest.ApiException('thrown on purpose') return Bunch(items=[Bunch(status=Bunch(phase='Running'), - metadata=Bunch(name='pod'))]) + metadata=Bunch(name='pod')), + Bunch(status=Bunch(phase='Evicted'), + metadata=Bunch(name='badpod'))]) def list_namespaced_pod(self, *_, **__): if self.fail: raise kubernetes.client.rest.ApiException('thrown on purpose') return Bunch(items=[Bunch(status=Bunch(phase='Running'), - metadata=Bunch(name='pod'))]) + metadata=Bunch(name='pod')), + Bunch(status=Bunch(phase='Evicted'), + metadata=Bunch(name='badpod'))]) class TestJanitor(object): @@ -154,8 +172,13 @@ def test_kill_pod(self): def test_list_pod_for_all_namespaces(self): janitor = self.get_client() + expected = DummyKubernetes().list_pod_for_all_namespaces() + expected = expected.items # pylint: disable=E1101 items = janitor.list_pod_for_all_namespaces() - assert len(items) == 1 and items[0].metadata.name == 'pod' + assert len(items) == len(expected) + for i in range(len(items)): + assert items[i].metadata.name == expected[i].metadata.name + assert items[i].status.phase == expected[i].status.phase janitor.get_core_v1_client = lambda: DummyKubernetes(fail=True) @@ -165,8 +188,12 @@ def test_list_pod_for_all_namespaces(self): def test_list_namespaced_pods(self): janitor = self.get_client() + expected = DummyKubernetes().list_namespaced_pod() + expected = expected.items # pylint: disable=E1101 items = janitor.list_namespaced_pod() - assert len(items) == 1 and items[0].metadata.name == 'pod' + for i in range(len(items)): + assert items[i].metadata.name == expected[i].metadata.name + assert items[i].status.phase == expected[i].status.phase janitor.get_core_v1_client = lambda: DummyKubernetes(fail=True) @@ -285,10 +312,10 @@ def test_is_valid_pod(self): def test_clean_key(self): janitor = self.get_client(stale_time=5) janitor.cleaning_queue = 'processing-q:pod' - assert janitor.clean_key('stale:new') is True - assert janitor.clean_key('stale:done') is True - assert janitor.clean_key('stale:failed') is True - assert janitor.clean_key('stale:working') is True + # assert janitor.clean_key('stale:new') is True + # assert janitor.clean_key('stale:done') is True + # assert janitor.clean_key('stale:failed') is True + # assert janitor.clean_key('stale:working') is True assert janitor.clean_key('goodkey:new') is False assert janitor.clean_key('goodkey:done') is False assert janitor.clean_key('goodkey:failed') is False @@ -307,17 +334,17 @@ def test_clean_key(self): janitor = self.get_client(stale_time=60) janitor.cleaning_queue = 'processing-q:pod' - assert janitor.clean_key('goodkeystale:inprogress') is True + # assert janitor.clean_key('goodkeystale:inprogress') is True # test in progress with status = Running with fresh update time - assert janitor.clean_key('goodkey:inprogress') is False + # assert janitor.clean_key('goodkey:inprogress') is False # test no `updated_at` assert janitor.clean_key('goodmalformed:inprogress') is False # test pod is not found - janitor.cleaning_queue = 'processing-q:bad' - assert janitor.clean_key('goodkey:inprogress') is True + # janitor.cleaning_queue = 'processing-q:bad' + # assert janitor.clean_key('goodkey:inprogress') is True # test pod is not found and stale janitor.cleaning_queue = 'processing-q:bad' diff --git a/redis_janitor/redis.py b/redis_janitor/redis.py index 9de97bc..e37aac0 100644 --- a/redis_janitor/redis.py +++ b/redis_janitor/redis.py @@ -23,49 +23,179 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================ -"""Fault tolerant RedisClient wrapper class""" +"""Fault tolerant Redis client wrapper class""" from __future__ import absolute_import from __future__ import division from __future__ import print_function -import time import logging +import time +import random import redis +REDIS_READONLY_COMMANDS = { + 'publish', + 'sunion', + 'readonly', + 'exists', + 'hstrlen', + 'lindex', + 'scan', + 'ping', + 'ttl', + 'wait', + 'zscore', + 'zrevrangebylex', + 'sscan', + 'geohash', + 'getbit', + 'hkeys', + 'zrange', + 'llen', + 'auth', + 'zcard', + 'dbsize', + 'subscribe', + 'zrangebylex', + 'zlexcount', + 'mget', + 'getrange', + 'bitpos', + 'lrange', + 'discard', + 'asking', + 'client', + 'pfselftest', + 'unsubscribe', + 'zrank', + 'readwrite', + 'hget', + 'bitcount', + 'randomkey', + 'time', + 'zrevrank', + 'sinter', + 'dump', + 'strlen', + 'unwatch', + 'smembers', + 'georadius', + 'lastsave', + 'slowlog', + 'sismember', + 'hexists', + 'multi', + 'sdiff', + 'geopos', + 'hscan', + 'script', + 'keys', + 'hvals', + 'pfcount', + 'zscan', + 'echo', + 'command', + 'select', + 'zcount', + 'substr', + 'pttl', + 'hlen', + 'info', + 'scard', + 'geodist', + 'srandmember', + 'hgetall', + 'pubsub', + 'psubscribe', + 'zrevrange', + 'hmget', + 'object', + 'watch', + 'zrangebyscore', + 'get', + 'type', + 'zrevrangebyscore', + 'punsubscribe', + 'georadiusbymember', +} + + class RedisClient(object): def __init__(self, host, port, backoff=1): self.logger = logging.getLogger(str(self.__class__.__name__)) self.backoff = backoff - self._redis = self._get_redis_client(host=host, port=port) + self._sentinel = self._get_redis_client(host=host, port=port) + self._redis_master = self._sentinel + self._redis_slaves = [self._sentinel] + self._update_masters_and_slaves() + + def _update_masters_and_slaves(self): + try: + sentinel_masters = self._sentinel.sentinel_masters() + + for master_set in sentinel_masters: + master = sentinel_masters[master_set] + + redis_master = self._get_redis_client( + master['ip'], master['port']) + + redis_slaves = [] + for slave in self._sentinel.sentinel_slaves(master_set): + redis_slave = self._get_redis_client( + slave['ip'], slave['port']) + redis_slaves.append(redis_slave) + + self._redis_slaves = redis_slaves + self._redis_master = redis_master + except redis.exceptions.ResponseError as err: + self.logger.warning('Encountered Error: %s. Using sentinel as ' + 'primary redis client.', err) - def _get_redis_client(self, host, port): # pylint: disable=R0201 - return redis.StrictRedis( - host=host, port=port, - decode_responses=True, - charset='utf-8') + @classmethod + def _get_redis_client(cls, host, port): + return redis.StrictRedis(host=host, port=port, + decode_responses=True, + charset='utf-8') def __getattr__(self, name): - redis_function = getattr(self._redis, name) def wrapper(*args, **kwargs): + values = list(args) + list(kwargs.values()) + values = [str(v) for v in values] while True: try: + if name in REDIS_READONLY_COMMANDS: + redis_client = random.choice(self._redis_slaves) + else: + redis_client = self._redis_master + redis_function = getattr(redis_client, name) return redis_function(*args, **kwargs) except redis.exceptions.ConnectionError as err: - values = list(args) + list(kwargs.values()) + self._update_masters_and_slaves() self.logger.warning('Encountered %s: %s when calling ' '`%s %s`. Retrying in %s seconds.', type(err).__name__, err, str(name).upper(), ' '.join(values), self.backoff) time.sleep(self.backoff) + except redis.exceptions.ResponseError as err: + # check if redis just needs a backoff + if 'BUSY' in str(err) and 'SCRIPT KILL' in str(err): + self.logger.warning('Encountered %s: %s when calling ' + '`%s %s`. Retrying in %s seconds.', + type(err).__name__, err, + str(name).upper(), + ' '.join(values), self.backoff) + time.sleep(self.backoff) + else: + raise err except Exception as err: - self.logger.error('Unexpected %s: %s when calling %s.', + self.logger.error('Unexpected %s: %s when calling `%s %s`.', type(err).__name__, err, - str(name).upper()) + str(name).upper(), ' '.join(values)) raise err return wrapper diff --git a/redis_janitor/redis_test.py b/redis_janitor/redis_test.py index d982260..ab41ccb 100644 --- a/redis_janitor/redis_test.py +++ b/redis_janitor/redis_test.py @@ -36,24 +36,39 @@ import redis_janitor +FAIL_COUNT = 0 + + class DummyRedis(object): - def __init__(self, fail_tolerance=0, hard_fail=False): - self.fail_count = 0 + def __init__(self, fail_tolerance=0, hard_fail=False, err=None): self.fail_tolerance = fail_tolerance self.hard_fail = hard_fail + if err is None: + err = redis.exceptions.ConnectionError('thrown on purpose') + self.err = err def get_fail_count(self): + global FAIL_COUNT if self.hard_fail: raise AssertionError('thrown on purpose') - if self.fail_count < self.fail_tolerance: - self.fail_count += 1 - raise redis.exceptions.ConnectionError('thrown on purpose') - return self.fail_count + if FAIL_COUNT < self.fail_tolerance: + FAIL_COUNT += 1 + raise self.err + return FAIL_COUNT + + def sentinel_masters(self): + return {'mymaster': {'ip': 'master', 'port': 6379}} + + def sentinel_slaves(self, _): + n = random.randint(1, 4) + return [{'ip': 'slave', 'port': 6379} for i in range(n)] class TestRedis(object): def test_redis_client(self): # pylint: disable=R0201 + global FAIL_COUNT + fails = random.randint(1, 3) RedisClient = redis_janitor.redis.RedisClient @@ -65,10 +80,33 @@ def _get_redis_client(*args, **kwargs): # pylint: disable=W0613 client = RedisClient(host='host', port='port', backoff=0) assert client.get_fail_count() == fails + FAIL_COUNT = 0 # reset for the next test with pytest.raises(AttributeError): client.unknown_function() + # test ResponseError BUSY - should retry + def _get_redis_client_retry(*args, **kwargs): # pylint: disable=W0613 + err = redis.exceptions.ResponseError('BUSY SCRIPT KILL') + return DummyRedis(fail_tolerance=fails, err=err) + + RedisClient._get_redis_client = _get_redis_client_retry + + client = RedisClient(host='host', port='port', backoff=0) + assert client.get_fail_count() == fails + FAIL_COUNT = 0 # reset for the next test + + # test ResponseError other - should fail + def _get_redis_client_err(*args, **kwargs): # pylint: disable=W0613 + err = redis.exceptions.ResponseError('OTHER ERROR') + return DummyRedis(fail_tolerance=fails, err=err) + + RedisClient._get_redis_client = _get_redis_client_err + client = RedisClient(host='host', port='port', backoff=0) + + with pytest.raises(redis.exceptions.ResponseError): + client.get_fail_count() + # test that other exceptions will raise. def _get_redis_client_bad(*args, **kwargs): # pylint: disable=W0613 return DummyRedis(fail_tolerance=fails, hard_fail=True)