diff --git a/bothub/nlu_worker/interpreter_manager.py b/bothub/nlu_worker/interpreter_manager.py index 9b953df..00d66f7 100644 --- a/bothub/nlu_worker/interpreter_manager.py +++ b/bothub/nlu_worker/interpreter_manager.py @@ -1,34 +1,75 @@ +import logging +import threading +import time +import gc + +from typing import Callable, Union from rasa.nlu import components from tempfile import mkdtemp +from datetime import datetime +from bothub import settings from bothub.shared.utils.persistor import BothubPersistor from bothub.shared.utils.backend import backend from bothub.shared.utils.rasa_components.bothub_interpreter import BothubInterpreter +logger = logging.getLogger(__name__) + + +class SetInterval: + """ + Creates a thread that execute a function every x seconds + """ + def __init__(self, interval: Union[int, float], action: Callable): + """ + :param interval: Period in seconds + :param action: Callable function + """ + self.interval = interval + self.action = action + self.stopEvent = threading.Event() + thread = threading.Thread(target=self._set_interval, daemon=True) + thread.start() + + def _set_interval(self): + next_time = time.time() + self.interval + while not self.stopEvent.wait(next_time - time.time()): + next_time += self.interval + self.action() + + def cancel(self): + self.stopEvent.set() + class InterpreterManager: def __init__(self): self.cached_interpreters = {} + SetInterval(settings.WORKER_CACHE_CLEANING_PERIOD, self._clean_cache) def get_interpreter( - self, repository_version, repository_authorization, rasa_version, use_cache=True - ): + self, + repository_version, + repository_authorization, + rasa_version, + use_cache=True + ) -> BothubInterpreter: update_request = backend().request_backend_parse_nlu_persistor( repository_version, repository_authorization, rasa_version, no_bot_data=True ) repository_name = ( - f"{update_request.get('version_id')}_" f"{update_request.get('language')}" + f"{update_request.get('version_id')}_{update_request.get('language')}" ) last_training = f"{update_request.get('total_training_end')}" # tries to fetch cache - cached_retrieved = self.cached_interpreters.get(repository_name) - if cached_retrieved and use_cache: - # returns cache only if it's the same training - if cached_retrieved["last_training"] == last_training: - return cached_retrieved["interpreter_data"] + retrieved_cache = self.cached_interpreters.get(repository_name) + if retrieved_cache and use_cache: + # retrieve cache only if it's the same training + if retrieved_cache["last_training"] == last_training: + retrieved_cache["last_request"] = datetime.now() + return retrieved_cache["interpreter_data"] persistor = BothubPersistor( repository_version, repository_authorization, rasa_version @@ -43,11 +84,29 @@ def get_interpreter( model_directory, components.ComponentBuilder(use_cache=False) ) - # update/creates cache - if use_cache: + if use_cache: # update/creates cache self.cached_interpreters[repository_name] = { "last_training": last_training, "interpreter_data": interpreter, + "last_request": datetime.now() } return interpreter + + def _clean_cache(self) -> None: + logger.info("Cleaning repositories cache") + cur_time = datetime.now() + + to_remove = [] + for interpreter in self.cached_interpreters: + last_request = self.cached_interpreters[interpreter]['last_request'] + idle_time = (cur_time - last_request).total_seconds() + if idle_time > settings.INTERPRETER_CACHE_IDLE_LIMIT: + to_remove.append(interpreter) + + for interpreter in to_remove: + del self.cached_interpreters[interpreter] + + logger.info(f"{len(to_remove)} interpreters cleaned") + objects_collected = gc.collect() + logger.info(f"{objects_collected} objects collected") diff --git a/bothub/settings.py b/bothub/settings.py new file mode 100644 index 0000000..9d2ccca --- /dev/null +++ b/bothub/settings.py @@ -0,0 +1,10 @@ +from decouple import config + +# Period of time (seconds) the worker will look for idle interpreters to free space +WORKER_CACHE_CLEANING_PERIOD = config( + "WORKER_CACHE_CLEANING_PERIOD", cast=float, default=3*3600 +) +# Idle limit of time (seconds) the interpreter will be cached +INTERPRETER_CACHE_IDLE_LIMIT = config( + "INTERPRETER_CACHE_IDLE_LIMIT", cast=float, default=24*3600 +) diff --git a/nlp.Dockerfile b/nlp.Dockerfile index 5363659..2fd7933 100644 --- a/nlp.Dockerfile +++ b/nlp.Dockerfile @@ -46,9 +46,8 @@ RUN pip install -U pip setuptools RUN pip install --find-links=${PYTHON_WHEELS_PATH} ${PIP_REQUIREMENTS} -COPY bothub/nlu_worker ${WORKDIR}/bothub/nlu_worker -COPY bothub/shared ${WORKDIR}/bothub/shared -COPY bothub/__init__.py ${WORKDIR}/bothub +COPY bothub ${WORKDIR}/bothub + COPY start_celery.py . COPY celery_app.py .