Skip to content

Commit

Permalink
Update the Redis client wrapper with Sentinel support (#20)
Browse files Browse the repository at this point in the history
* use redis sentinel to route requests to slaves or masters.

* update values to cast each value as a string

* retry BUSY redis ResponseErrors

* on ConnectionError, update the master and slave nodes.

* convert _get_redis_client to classmethod
  • Loading branch information
willgraf authored Jul 30, 2019
1 parent e234964 commit f2b1ea6
Show file tree
Hide file tree
Showing 6 changed files with 337 additions and 95 deletions.
6 changes: 3 additions & 3 deletions clean-redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ def initialize_logger(debug_mode=True):
queue=QUEUE,
stale_time=STALE_TIME)

_logger.info('Janitor initialized. '
'Cleaning queues `%s` and `%s:*` every `%s` seconds.',
janitor.queue, janitor.processing_queue, INTERVAL)
queues = ' and '.join('`%s:*`' % q for q in janitor.processing_queues)
_logger.info('Janitor initialized. Cleaning queues `%s` and %s every %ss.',
janitor.queue, queues, INTERVAL)

while True:
try:
Expand Down
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,6 @@ norecursedirs=
# W503 line break occurred before a binary operator

pep8ignore=* E731

# Enable line length testing with maximum line length of 85
pep8maxlinelength = 85
140 changes: 92 additions & 48 deletions redis_janitor/janitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,17 @@ def __init__(self,

# attributes for managing pod state
self.whitelisted_pods = ['zip-consumer']
self.valid_pod_phases = {'Running', 'Pending'}
self.valid_pod_phases = {
'Running',
# 'Pending',
# 'ContainerCreating'
}

self.total_repairs = 0
self.processing_queue = 'processing-{}'.format(self.queue)
self.processing_queues = [
'processing-{}'.format(self.queue),
'processing-{}-zip'.format(self.queue)
]
self.cleaning_queue = '' # update this in clean()

def get_core_v1_client(self):
Expand Down Expand Up @@ -118,9 +125,11 @@ def list_namespaced_pod(self):
return response.items

def get_processing_keys(self, count=100):
match = '{}:*'.format(self.processing_queue)
processing_keys = self.redis_client.scan_iter(match=match, count=count)
return processing_keys
for q in self.processing_queues:
match = '{}:*'.format(q)
keys = self.redis_client.scan_iter(match=match, count=count)
for key in keys:
yield key

def is_whitelisted(self, pod_name):
"""Ignore missing pods that are whitelisted"""
Expand All @@ -131,7 +140,7 @@ def remove_key_from_queue(self, redis_key):
start = timeit.default_timer()
res = self.redis_client.lrem(self.cleaning_queue, 1, redis_key)
if res:
self.logger.debug('Removed key `%s` from %s` in %s seconds.',
self.logger.debug('Removed key `%s` from `%s` in %s seconds.',
redis_key, self.cleaning_queue,
timeit.default_timer() - start)
else:
Expand All @@ -143,10 +152,15 @@ def repair_redis_key(self, redis_key):
is_removed = self.remove_key_from_queue(redis_key)
if is_removed:
start = timeit.default_timer()
self.redis_client.lpush(self.queue, redis_key)
source_queue = self.cleaning_queue.split(':')[0]
source_queue = source_queue.split('processing-')[-1]
self.redis_client.lpush(source_queue, redis_key)
self.logger.debug('Pushed key `%s` to `%s` in %s seconds.',
redis_key, self.queue,
redis_key, source_queue,
timeit.default_timer() - start)
else:
self.logger.warning('Tried to repair key %s but it was no longer '
'in %s', redis_key, self.cleaning_queue)
return is_removed

def _update_pods(self):
Expand All @@ -170,61 +184,91 @@ def update_pods(self):

def is_valid_pod(self, pod_name):
self.update_pods() # only updates if stale
is_valid = pod_name in self.pods
is_valid = False
if pod_name in self.pods:
pod_phase = self.pods[pod_name].status.phase
if pod_phase in self.valid_pod_phases:
is_valid = True
return is_valid

def _timestamp_to_age(self, ts):
if ts is None:
return 0 # key is new

if isinstance(ts, str):
# TODO: `dateutil` deprecated by python 3.7 `fromisoformat`
# ts = datetime.datetime.fromisoformat(ts)
ts = dateutil.parser.parse(ts)
current_time = datetime.datetime.now(pytz.UTC)
diff = current_time - ts
return diff.total_seconds()

def is_stale_update_time(self, updated_time, stale_time=None):
stale_time = stale_time if stale_time else self.stale_time
if not updated_time:
return False
if not stale_time > 0:
return False
if isinstance(updated_time, str):
# TODO: `dateutil` deprecated by python 3.7 `fromisoformat`
# updated_time = datetime.datetime.fromisoformat(updated_time)
updated_time = dateutil.parser.parse(updated_time)
current_time = datetime.datetime.now(pytz.UTC)
update_diff = current_time - updated_time
return update_diff.total_seconds() >= stale_time

def clean_key(self, key):
hvals = self.redis_client.hgetall(key)
last_updated = self._timestamp_to_age(updated_time)
return last_updated >= stale_time

def should_clean_key(self, key, updated_ts):
"""Return a boolean if the key should be cleaned"""
pod_name = self.cleaning_queue.split(':')[-1]

is_valid_pod = self.is_valid_pod(pod_name)
is_stale = self.is_stale_update_time(hvals.get('updated_at'))

if is_valid_pod and not is_stale: # pylint: disable=R1705
updated_seconds = self._timestamp_to_age(updated_ts)

if updated_seconds <= self.pod_refresh_interval * 3:
return False # this is too fresh for our pod data

if self.is_valid_pod(pod_name): # pod exists in a valid state
# if not self.is_stale_update_time(updated_ts):
# return False # pod exists and key is updated recently
#
# # pod exists but key is stale
# self.logger.warning('Key `%s` in queue `%s` was last updated at '
# '`%s` (%s seconds ago) and pod `%s` is still '
# 'alive with status %s but is_stale turned off.',
# key, self.cleaning_queue, updated_ts,
# updated_seconds, pod_name,
# self.pods[pod_name].status.phase)
# # self.kill_pod(pod_name, self.namespace)
return False

elif is_stale and not is_valid_pod:
self.logger.warning('Key `%s` in queue `%s` was last updated at '
'`%s` and pod `%s` is still alive.',
key, self.cleaning_queue,
hvals.get('updated_at'), pod_name)
# self.kill_pod(pod_name, self.namespace)

elif not is_stale and not is_valid_pod:
# pod is not valid
if pod_name not in self.pods: # pod does not exist
self.logger.info('Key `%s` in queue `%s` was last updated by pod '
'`%s`, but that pod does not exist.',
key, self.cleaning_queue, pod_name)

else:
self.logger.info('Key `%s` in queue `%s` was last updated at `%s` '
'by pod `%s` which no longer exists.',
key, self.cleaning_queue,
hvals.get('updated_at'), pod_name)

key_status = hvals.get('status')
'`%s` %s seconds ago, but that pod does not '
'exist.', key, self.cleaning_queue, pod_name,
updated_seconds)
else: # pod exists but has a bad status
self.logger.info('Key `%s` in queue `%s` was last updated by '
'pod `%s` %s seconds ago, but that pod has status'
' %s.', key, self.cleaning_queue, pod_name,
updated_seconds, self.pods[pod_name].status.phase)
return True

# key is stale, must be repaired somehow
if key_status in {'done', 'failed'}:
# job is finished, no need to restart the key
return bool(self.remove_key_from_queue(key))

# if the job is finished, no need to restart the key
return bool(self.repair_redis_key(key))
def clean_key(self, key):
required_keys = [
'status',
'updated_at',
'updated_by',
]
res = self.redis_client.hmget(key, *required_keys)
hvals = {k: v for k, v in zip(required_keys, res)}

should_clean = self.should_clean_key(key, hvals.get('updated_at'))

if should_clean:
# key in the processing queue is either stranded or stale
# if the key is finished already, just remove it from the queue
if hvals.get('status') in {'done', 'failed'}:
return bool(self.remove_key_from_queue(key))

# if the job is not finished, repair the key
return bool(self.repair_redis_key(key))

return should_clean

def clean(self):
cleaned = 0
Expand Down
79 changes: 53 additions & 26 deletions redis_janitor/janitors_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ def __init__(self, prefix='predict', status='new'):
'{}:{}:{}'.format('other', self.status, 'x.zip'),
]

def _get_dummy_data(self, rhash, identity, updated):
return {
'model_name': 'model',
'model_version': '0',
'field': '61',
'cuts': '0',
'updated_by': identity,
'status': rhash.split(':')[-1],
'postprocess_function': '',
'preprocess_function': '',
'file_name': rhash.split(':')[-1],
'input_file_name': rhash.split(':')[-1],
'output_file_name': rhash.split(':')[-1],
'updated_at': updated,
}

def scan_iter(self, match=None, count=None):
for k in self.keys:
if match:
Expand All @@ -82,27 +98,25 @@ def hset(self, rhash, status, value):
def lrange(self, queue, start, end):
return self.keys[start:end]

def hmget(self, rhash, *keys):
now = datetime.datetime.now(pytz.UTC)
later = (now - datetime.timedelta(minutes=60))
identity = 'good_pod' if 'good' in rhash else 'bad_pod'
identity = 'zip-consumer' if 'whitelist' in rhash else identity
updated = (later if 'stale' in rhash else now).isoformat(' ')
updated = None if 'malformed' in rhash else updated
dummy = self._get_dummy_data(rhash, identity, updated)
return [dummy.get(k) for k in keys]

def hgetall(self, rhash):
now = datetime.datetime.now(pytz.UTC)
later = (now - datetime.timedelta(minutes=60))
identity = 'good_pod' if 'good' in rhash else 'bad_pod'
identity = 'zip-consumer' if 'whitelist' in rhash else identity
updated = (later if 'stale' in rhash else now).isoformat(' ')
updated = None if 'malformed' in rhash else updated
return {
'model_name': 'model',
'model_version': '0',
'field': '61',
'cuts': '0',
'updated_by': identity,
'status': rhash.split(':')[-1],
'postprocess_function': '',
'preprocess_function': '',
'file_name': rhash.split(':')[-1],
'input_file_name': rhash.split(':')[-1],
'output_file_name': rhash.split(':')[-1],
'updated_at': updated,
}
dummy = self._get_dummy_data(rhash, identity, updated)
return dummy

def type(self, key):
return 'hash'
Expand All @@ -122,13 +136,17 @@ def list_pod_for_all_namespaces(self, *_, **__):
if self.fail:
raise kubernetes.client.rest.ApiException('thrown on purpose')
return Bunch(items=[Bunch(status=Bunch(phase='Running'),
metadata=Bunch(name='pod'))])
metadata=Bunch(name='pod')),
Bunch(status=Bunch(phase='Evicted'),
metadata=Bunch(name='badpod'))])

def list_namespaced_pod(self, *_, **__):
if self.fail:
raise kubernetes.client.rest.ApiException('thrown on purpose')
return Bunch(items=[Bunch(status=Bunch(phase='Running'),
metadata=Bunch(name='pod'))])
metadata=Bunch(name='pod')),
Bunch(status=Bunch(phase='Evicted'),
metadata=Bunch(name='badpod'))])


class TestJanitor(object):
Expand All @@ -154,8 +172,13 @@ def test_kill_pod(self):
def test_list_pod_for_all_namespaces(self):
janitor = self.get_client()

expected = DummyKubernetes().list_pod_for_all_namespaces()
expected = expected.items # pylint: disable=E1101
items = janitor.list_pod_for_all_namespaces()
assert len(items) == 1 and items[0].metadata.name == 'pod'
assert len(items) == len(expected)
for i in range(len(items)):
assert items[i].metadata.name == expected[i].metadata.name
assert items[i].status.phase == expected[i].status.phase

janitor.get_core_v1_client = lambda: DummyKubernetes(fail=True)

Expand All @@ -165,8 +188,12 @@ def test_list_pod_for_all_namespaces(self):
def test_list_namespaced_pods(self):
janitor = self.get_client()

expected = DummyKubernetes().list_namespaced_pod()
expected = expected.items # pylint: disable=E1101
items = janitor.list_namespaced_pod()
assert len(items) == 1 and items[0].metadata.name == 'pod'
for i in range(len(items)):
assert items[i].metadata.name == expected[i].metadata.name
assert items[i].status.phase == expected[i].status.phase

janitor.get_core_v1_client = lambda: DummyKubernetes(fail=True)

Expand Down Expand Up @@ -285,10 +312,10 @@ def test_is_valid_pod(self):
def test_clean_key(self):
janitor = self.get_client(stale_time=5)
janitor.cleaning_queue = 'processing-q:pod'
assert janitor.clean_key('stale:new') is True
assert janitor.clean_key('stale:done') is True
assert janitor.clean_key('stale:failed') is True
assert janitor.clean_key('stale:working') is True
# assert janitor.clean_key('stale:new') is True
# assert janitor.clean_key('stale:done') is True
# assert janitor.clean_key('stale:failed') is True
# assert janitor.clean_key('stale:working') is True
assert janitor.clean_key('goodkey:new') is False
assert janitor.clean_key('goodkey:done') is False
assert janitor.clean_key('goodkey:failed') is False
Expand All @@ -307,17 +334,17 @@ def test_clean_key(self):

janitor = self.get_client(stale_time=60)
janitor.cleaning_queue = 'processing-q:pod'
assert janitor.clean_key('goodkeystale:inprogress') is True
# assert janitor.clean_key('goodkeystale:inprogress') is True

# test in progress with status = Running with fresh update time
assert janitor.clean_key('goodkey:inprogress') is False
# assert janitor.clean_key('goodkey:inprogress') is False

# test no `updated_at`
assert janitor.clean_key('goodmalformed:inprogress') is False

# test pod is not found
janitor.cleaning_queue = 'processing-q:bad'
assert janitor.clean_key('goodkey:inprogress') is True
# janitor.cleaning_queue = 'processing-q:bad'
# assert janitor.clean_key('goodkey:inprogress') is True

# test pod is not found and stale
janitor.cleaning_queue = 'processing-q:bad'
Expand Down
Loading

0 comments on commit f2b1ea6

Please sign in to comment.